本文介绍如何在 Flink 中使用 Paimon。

准备 Paimon Jar 文件

Paimon 目前支持 Flink 1.17、1.16、1.15 和 1.14。推荐使用最新的 Flink 版本以获得更好的体验。

下载对应版本的 jar 文件:

也可以选择从源代码手动构建 jar 文件。

要从源代码构建,需要先把代码克隆到本地,Github:https://github.com/apache/incubator-paimon.git。

然后使用以下命令构建:

1
mvn clean install -DskipTests

构建成功后,可以在 ./paimon-flink/paimon-flink-<flink-version>/target/paimon-flink-<flink-version>-0.4.0-incubating.jar 中找到 jar 文件。

快速入门

1)下载 Flink

如果还没有下载 Flink,可以通过 https://flink.apache.org/downloads.html 下载,然后使用以下命令进行解压:

1
tar -xzf flink-*.tgz

2)将 Paimon 的 jar 文件拷贝到 Flink 主目录的 lib 目录下

1
cp paimon-flink-*.jar <FLINK_HOME>/lib/

3)将 Hadoop 的 jar 文件拷贝到 Flink 主目录的 lib 目录下

如果机器处于 Hadoop 环境中,请确保环境变量 HADOOP_CLASSPATH 的值包括常用 Hadoop 库的路径,你不需要使用下面预装的 Hadoop jar 文件。

下载 Pre-bundled Hadoop jar 文件,并将该 jar 文件拷贝到 Flink 目录的 lib 目录下。

1
cp flink-shaded-hadoop-2-uber-*.jar <FLINK_HOME>/lib/

4)启动 Flink 本地集群

为了可以同时运行多个 Flink 作业,需要修改/conf/flink-conf.yaml 中的集群配置:

1
taskmanager.numberOfTaskSlots: 2

运行 Flink 自带的 bash 脚本,启动本地集群:

1
<FLINK_HOME>/bin/start-cluster.sh

在浏览器打开链接 http://localhost:8081/ 查看 Flink 仪表盘,可以看到集群已启动并正在运行。

启动 Flink SQL 客户端来执行 SQL 脚本:

1
<FLINK_HOME>/bin/sql-client.sh

5)创建目录和表

依次执行以下 SQL 语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- if you're trying out Paimon in a distributed environment,
-- the warehouse path should be set to a shared file system, such as HDFS or OSS
CREATE CATALOG my_catalog WITH (
'type'='paimon',
'warehouse'='file:/tmp/paimon'
);

USE CATALOG my_catalog;

-- create a word count table
CREATE TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
);

6)写入数据

依次执行以下 SQL 语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
-- create a word data generator table
CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);

-- paimon requires checkpoint interval in streaming mode
SET 'execution.checkpointing.interval' = '10 s';

-- write streaming data to dynamic table
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

7)OLAP 查询

依次执行以下 SQL:

1
2
3
4
5
6
7
8
9
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- olap query the table
SELECT * FROM word_count;

可以多执行几次上面的查询 SQL 语句,并观察结果的变化。

8)流式查询

1
2
3
4
5
6
-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';

-- track the changes of table and calculate the count interval statistics
SELECT `interval`, COUNT(*) AS interval_cnt FROM
(SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;

9)退出

打开 Flink 仪表盘上正在运行作业列表上的作业,进入作业详情页面,点击 Cancel Job 按钮,取消作业。

执行以下 SQL 语句,退出 Flink SQL 客户端:

1
2
3
4
5
-- uncomment the following line if you want to drop the dynamic table and clear the files
-- DROP TABLE word_count;

-- exit sql-client
EXIT;

停止 Flink 本地集群:

1
./bin/stop-cluster.sh

Flink 数据类型请参阅:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/types/。

支持所有 Flink 数据类型,除了

  • 不支持 MULTISET 类型;
  • 不支持将 MAP 作为主键。

(END)