zookeeper
一、初识
1.1 简介
- Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
- 从设计模式角度来理解,Zookeeper是一个基于观察者模式设计的分布式服务管理框架。
- 它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化 ,Zookeeper就将负责通知已经在Zookeeper上注册
的那些观察者做出相应的反应。
1.2 特点
- Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
- 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
- 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
- 更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行。
- 数据更新原子性,一次数据更新要么成功,要么失败。
- 实时性,在一定时间范围内,Client能读到最新数据。
1.3 数据结构
- ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一
个ZNode。 - 每一个ZNode默认能够存储1M B的数据,每个ZNode都可以通过其路径唯一标识。
1.4 应用场景
统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。 例如:IP不容易记住,而域名容易记住。
统一配置管理
分布式环境下,配置文件同步非常常见。
- 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
- 对配置文件修改后,希望能够快速同步到各个节点上。
配置管理可交由ZooKeeper实现
- 可将配置信息写入ZooKeeper上的一个Znode。
- 各个客户端服务器监听这个Znode。
- 一 旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。
统一集群管理
分布式环境中,实时掌握每个节点的状态是必要的,有时需要根据节点实时状态做出一些调整。
ZooKeeper可以实现实时监控节点状态变化
- 可将节点信息写入ZooKeeper上的一个ZNode。
- 监听这个ZNode可获取它的实时状态变化。
服务器节点动态上下线
软负载均衡
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
二、安装
2.1 本地模式
1.安装
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -C /opt
mv /opt/apache-zookeeper-3.6.2-bin /opt/zookeeper-3.6.2
2. 配置修改
# 复制配置样本
mv /opt/zookeeper-3.6.2/conf/zoo_sample.cfg /opt/zookeeper-3.6.2/conf/zoo.cfg
# 创建数据目录
mkdir /opt/zookeeper-3.6.2/zkData
#修改dataDir路径
vim /opt/zookeeper-3.6.2/conf/zoo.cfg
# dataDir=/opt/zookeeper-3.6.2/zkData
3. 启动
#3.1 启动
/opt/zookeeper-3.6.2/bin/zkServer.sh start
#3.2 检测
jps
#3.3 查看状态
/opt/zookeeper-3.6.2/bin/zkServer.sh status
# 4001 QuorumPeerMain
#3.4 启动客户端
/opt/zookeeper-3.6.2/bin/zkCli.sh
#3.5 退出客户端
quit
#3.6 停止zookeeper
/opt/zookeeper-3.6.2/bin/zkServer.sh stop
4. 配置参数
- tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒,用于心跳机制。
- initLimit =10:集群中的Follower与Leader之间初始连接时能容忍的最多心跳数。
- syncLimit =5:集群中的Follower与Leader之间最大响应心跳数,超时则从服务器列表中删除Follwer。
- dataDir:数据文件目录+数据持久化路径
- clientPort =2181:客户端连接端口
2.2 分布式模式
1. 单机部署
按2.1的步骤,在3台linux中部署zookeeper。
2. 新增标识
在3台linux中分别给zookeeper增加myid文件,里面分别放zookeeper的编号1,2,3
vim /opt/zookeeper-3.6.2/zkData/myid
3. 在每个cfg配置文件中添加集群节点
vim /opt/zookeeper-3.6.2/conf/zoo.cfg
#增加如下配置
#######################cluster##########################
# 2888端口:服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
# 3888端口:万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,用来执行选举时服务器相互通信的端口。
# 记得在防火墙开启2128,2888,3888节点
server.1=192.168.100.5:2888:3888
server.2=192.168.100.6:2888:3888
server.3=hadoop104:2888:3888
4. 分别启动zookeeper
三、原理
3.1 选举机制
- 半数机制:集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。
- Zookeeper 工作时有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。
演示
假设有五台服务器组成的 Zookeeper 集群,它们的 id 从 1-5,同时它们都是新启动的,也就是没有历史数据。
- 服务器 1 启动,发起一次选举。
1.1 服务器 1 投自己一票。
1.2 此时服务器 1 票数一票,不够半数以上(3 票),选举无法完成,服务器 1 状态保持为 LOOKING; - 服务器 2 启动,再发起一次选举。
2.1 服务器 1 和 2 分别投自己一票并交换选票信息:
2.2 此时服务器 1 发现服务器 2 的 ID 比自己目前投票推举的(服务器 1)大,更改选票为推举服务器 2。
2.3 此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,服务器 1,2 状态保持 LOOKING - 服务器 3 启动,发起一次选举并交换选票信息。
3.1 此时服务器 1 和 2 都会更改选票为服务器 3。
3.2 此次投票结果:服务器 1 为 0 票,服务器 2 为 0 票,服务器 3 为 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。
3.3 服务器 1,2 更改状态为 FOLLOWING,服务器 3 更改状态为 LEADING; - 服务器 4 启动,发起一次选举。
4.1 此时服务器 1,2,3 已经不是 LOOKING 状态,不会更改选票信息。
4.2 交换选票信息结果:服务器 3 为 3 票,服务器 4 为 1 票。
4.3 此时服务器 4 服从多数,更改选票信息为服务器 3,并更改状态为 FOLLOWING; - 服务器 5 启动,同 4 一样当FOLLOWING。
3.2 节点类型
- 持久节点(Persistent):客户端和服务器端断开连接后,创建的节点不删除。
- 短暂节点(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除,也称临时节点。
- 顺序标识:创建znode时可以设置顺序标识,znode名称后会附加一个顺序号,顺序号是一个单调递增的计数器,由父节点维护。
3.1 注:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
3.3 监听器原理
- 在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
- 通过connect线程将注册的监听事件发送给Zookeeper。
2.1 监听节点数据的变化 get path [watch]
2.2 监听子节点增减的变化 ls path [watch] - 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
- Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
- listener线程内部调用了process()方法
3.5 写数据流程
- Client 向 ZooKeeper 的Server1 上写数据,发送一个写请求。
- 如果Server1不是Leader,那么Server1 会把接受到的请求进一步转发给Leader,因为每个ZooKeeper的Server里面有一个是Leader。
- Leader 会将写请求广播给各个Server,比如Server1和Server2,各个Server会将该写请求加入待写队列,并向Leader发送成功信息。
- 当Leader收到半数以上 Server 的成功信息,说明该写操作可以执行。Leader会向各个Server 发送提交信息,各个Server收到信息后会落实队列里的写请求,此时写成功。
- Server1会进一步通知 Client 数据写成功了,这时就认为整个写操作成功。
四、实战
4.1 客户端命令
启动客户端
/opt/zookeeper-3.6.2/bin/zkCli.sh
显示客户端命令
help
查看当前 znode 中所包含的内容
ls /
查看当前节点详细数据
ls -s /
- czxid:创建节点的事务ID(zxid),每个修改都有唯一的 zxid,按发生顺序递增。
- ctime:znode创建的时间戳
- mzxid:znode最后更新的事务ID
- mtime:znode最后修改的时间戳
- pZxid:znode最后更新的子节点事务ID
- cversion:znode子节点修改次数
- dataversion:znode数据变化号
- aclVersion:znode访问控制列表的变化号
- ephemeralOwner:临时节点拥有者的 session id。
- dataLength:znode的数据长度
- numChildren:znode子节点数量
创建普通节点
create /sanguo "jinlian"
获得节点的值
get /sanguo
创建短暂节点
create -e /sanguo/wuguo
# 退出当前客户端然后再重启客户端,再次查看根目录下短暂节点已经删除
创建带序号的节点
create -s /sanguo/weiguo/xiaoqiao "jinlian"
修改节点数据值
set /sanguo/weiguo "simayi"
节点的值变化监听
get /sanguo watch
#在另一台zookeeper上修改/sanguo的值
set /sanguo "xisi"
子节点变化监听(路径变化)
ls /sanguo watch
# 在另一台zookeeper上创建/sanguo子节点
create /sanguo/jin "simayi"
删除节点
delete /sanguo/jin
递归删除节点
rmr /sanguo/shuguo
查看节点状态
stat /sanguo
4.2 API调用
- 搭建springboot项目
- 配置日志logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="logger.path" value="D:/logs" />
<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!--输出到控制台-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<!--输出到文件-->
<!-- 时间滚动输出 level为 DEBUG 日志 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${logger.path}/log_debug.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${logger.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logger.path}/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${logger.path}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 WARN 日志 -->
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logger.path}/log_warn.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logger.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${logger.path}/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${logger.path}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录ERROR级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!--
root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
不能设置为INHERITED或者同义词NULL。默认是DEBUG
可以包含零个或多个元素,标识这个appender将会添加到这个logger。
-->
<root level="info">
<appender-ref ref="CONSOLE" />
<appender-ref ref="DEBUG_FILE" />
<appender-ref ref="INFO_FILE" />
<appender-ref ref="WARN_FILE" />
<appender-ref ref="ERROR_FILE" />
</root>
</configuration>
- 创建zookeeper客户端
package site.javaee.zookeeper;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
@Slf4j
class ZookeeperTest {
private static ZooKeeper zkClient = null;
//创建zookeeper客户端
public static void init() throws Exception {
String connectString = "zoo1:2181";
int sessionTimeout = 2000;
zkClient = new ZooKeeper(connectString, sessionTimeout, (event) -> {
// 收到事件通知后的回调函数(用户的业务逻辑)
System.out.println(event.getType() + "--" +
event.getPath());
// 再次启动监听
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
});
}
//创建节点
public static void createNode() throws KeeperException, InterruptedException {
String path = zkClient.create("/javaee", "WalkerDogW".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
//获取子节点并监听节点变化
public static void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
//判断zNode是否存在
public static void exist() throws Exception {
Stat stat = zkClient.exists("/eclipse", false);
System.out.println(stat == null ? "not exist" : "exist");
}
public static void main(String[] args) throws Exception {
log.info("gg");
init();
createNode();
}
}
Q.E.D.
Comments | 0 条评论