Flink 中的 History Server 是一个非常有用的组件,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。并且,它还提供了一个 REST API,可接受 HTTP 请求并以 JSON 数据作为响应。本文将详细介绍 Flink History Server 的工作原理和主要功能。

一、History Server工作原理

Apache Flink 自带了一个 HistoryServer 进程,它是一个独立的 Web 服务器。HistoryServer 不参与 Flink 作业执行,仅用于展示作业的历史信息。它的工作原理如下:

  1. JobManager 会将已完成的作业的信息以存档文件的形式写入 HDFS 或者其他持久存储中。
  2. HistoryServer 读取这些存档文件,并提供 Web 界面展示其信息内容。
  3. 用户通过 HistoryServer 的 Web UI 查看作业记录和运行数据。

每个作业完成后,JobManager 会把该作业的信息打包成一个个 JSON 格式的归档文件,包括作业配置信息、作业执行过程中的 Checkpoint 信息、已完成作业的统计数据等,并存储在 HDFS 或者其他文件系统中。HistoryServer 会定期轮询这些归档文件所在的目录,所包含的存档会被下载缓存在本地文件系统中,以便在 Web UI 展示。

二、History Server的主要功能

通过 History Server Web UI,用户可以查看 Flink 已完成作业的各种历史信息,主要包含以下方面:

  1. 查看所有已完成作业列表,以列表的形式展示了每一次作业执行的概览信息。
  2. 作业配置信息,包括并行度设置、系统参数等。
  3. 作业计划(Execution Graph),以图的形式展示作业的拓扑结构。
  4. 作业内所有任务(Vertex)的统计数据,如每个任务的运行时长、发送记录数等。
  5. 作业失败或异常信息,可以指导定位故障。
  6. 作业执行过程中的 Checkpoint 信息,如 Checkpoint 失败次数、Checkpoint 统计信息等。
  7. 支持以图表的形式查看作业时间线(Timeline),可以很直观地观察作业的执行过程。

三、History Server配置

JobManager

根据 History Server 的工作原理,我们知道 JobManager 负责已完成作业的归档工作,即将已完成的作业的信息写入到 HDFS 或者其他持久存储中,以供 History Server 查询展示。所以我们需要在 flink-conf.yaml 文件中配置已完成作业的存档目录(通过 jobmanager.archive.fs.dir 设置),如:

1
2
# 已完成作业信息的存档目录
jobmanager.archive.fs.dir: hdfs:///completed-jobs

HistoryServer

已完成作业的存档目录配置好后,接下来就是配置 HistoryServer。通过设置 historyserver.archive.fs.dir 告诉 HistoryServer 需要从哪些目录读取已完成作业的信息。该配置项通常与 jobmanager.archive.fs.dir 一致,即 JobManager 将已完成的作业信息存放在哪里,HistoryServer 就去哪里读。如果目录有多个,可以使用英文逗号分隔。

1
2
# 监视以下目录中已完成的作业
historyserver.archive.fs.dir: hdfs:///completed-jobs

此外,还可以通过配置项 historyserver.archive.fs.refresh-interval 设置 HistoryServer 定期读取目录的时间间隔(单位为毫秒),如设置每 10 秒刷新一次:

1
2
# 每 10 秒刷新一次
historyserver.archive.fs.refresh-interval: 10000

HistoryServer 会将目录所包含的存档下载并缓存在本地文件系统中,本地目录可以通过 historyserver.web.tmpdir 配置。

HistoryServer 的 WebUI 监听的地址和端口,可通过以下配置项 historyserver.web.addresshistoryserver.web.port 设置:

1
2
3
4
5
# HistoryServer 的监听地址
historyserver.web.address: 0.0.0.0

# HistoryServer 的监听端口
historyserver.web.port: 8082

配置参数

配置项 默认值 类型 说明
historyserver.archive.clean-expired-jobs false Boolean 是否应清理不再存在于 historyserver.archive.fs.dir 的作业。
historyserver.archive.fs.dir (none) String 以逗号分隔的目录列表,用于获取存档作业。HistoryServer 将监控这些目录中的存档作业。
historyserver.archive.fs.refresh-interval 10000 Long 刷新存档工作目录的间隔(毫秒)。
historyserver.archive.retained-jobs -1 Integer historyserver.archive.fs.dir 定义的每个存档目录中要保留的作业的最大数量。如果设置为 -1(默认),则存档数量没有限制。如果设置为 0 或小于 -1,HistoryServer 将抛出 IllegalConfigurationException。
historyserver.log.jobmanager.url-pattern (none) String JobManager 的日志 URL 模式。HistoryServer 将从中生成实际 URL,并将特殊占位符 替换为作业 ID。仅支持 http / https 方案。
historyserver.log.taskmanager.url-pattern (none) String 任务管理器日志 URL 的模式。HistoryServer 将从中生成实际 URL,并将特殊占位符 分别替换为 job 和 TaskManager 的 id。仅支持 http / https 方案。
historyserver.web.address (none) String HistoryServer WebUI 地址。
historyserver.web.port 8082 Integer HistoryServer WebUI 端口。
historyserver.web.refresh-interval 10000 Long HistoryServer 网络前端的刷新间隔(毫秒)。
historyserver.web.ssl.enabled false Boolean 启用 HTTP 访问 HistoryServer 网络前端。只有当全局 SSL 标志 security.ssl.enabled 设为 true 时才适用。
historyserver.web.tmpdir (none) String HistoryServer REST API 用于临时文件的本地目录。

四、启动History Server

在配置好 HistoryServer 和 JobManager 之后,可以使用以下的脚本来启动和停止 HistoryServer:

启动 HistoryServer

1
2
# 启动 HistoryServer
bin/historyserver.sh start

停止 HistoryServer

1
2
# 停止 HistoryServer
bin/historyserver.sh stop

需要注意的是:HistoryServer 目前只能作为独立的进程运行。

小结

Flink 中的 History Server 是一款非常实用且强大的组件,它通过记录和展示 Flink 作业的历史运行信息,为审计和调试作业提供了重要的手段。该工具采用独立的 Web 服务器,不参与作业执行,仅用于展示作业的历史信息。History Server 的工作原理是 JobManager 将完成作业的信息存档到 HDFS 或其他持久存储,然后 History Server 读取并下载缓存到本地文件系统中,以提供 Web 界面展示。

通过 History Server 的 Web UI,我们可以轻松查看已完成作业列表、任务统计数据、作业失败异常信息、Checkpoint 信息等。借助 History Server,我们可以审计已完成的作业,分析和优化作业性能,定位故障原因,是非常必要的辅助工具。

以上是 Apache Flink History Server 的简要介绍,希望对大家有所帮助。

(END)