本文将深入探讨各种文件操作对文件产生的影响。文章提供具体的例子和实用技巧,说明 Paimon 是如何有效管理文件的。此外,通过对提交(commit)和压缩(compact)等操作的深入剖析,旨在加深对文件的创建和更新的理解。

先决条件

在深入阅读本文之前,本文假设你已经了解了以下内容:

  • Paimon 基本概念;
  • Paimon 文件布局;
  • 如何在 Flink 中使用 Paimon。

创建目录

通过 ./sql-client.sh 启动 Flink SQL 客户端,并逐一执行以下语句来创建 Paimon 目录:

1
2
3
4
5
6
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 'file:///tmp/paimon'
);

USE CATALOG paimon;

执行该创建目录语句会在给定的路径 file:///tmp/paimon 创建一个目录:

1
2
$ ls /tmp/paimon
default.db

创建表

执行下面的创建表语句,创建一个有 3 个字段的 Paimon 表:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE T (
id BIGINT,
a INT,
b STRING,
dt STRING COMMENT 'timestamp string in format yyyyMMdd',
PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'write-mode' = 'change-log',
'merge-engine' = 'deduplicate'
);

执行该创建表语句会在路径 /tmp/paimon/default.db/T 下创建 Paimon 表 T,其模式存储在 /tmp/paimon/default.db/T/schema/schema-0 中。

使用 tree 命令以树状格式列出当前 Catalog 下的内容:

1
2
3
4
5
6
7
8
$ tree /tmp/paimon/
/tmp/paimon/
└── default.db
└── T
└── schema
└── schema-0

3 directories, 1 file

向表中插入记录

在 Flink SQL 中运行以下插入语句:

1
INSERT INTO T VALUES (1, 10001, 'varchar00001', '20230501');

一旦 Flink 任务完成,记录就会通过一次成功的 commit 写入 Paimon 表。用户可以通过执行查询 SELECT * FROM T 来验证这些记录的可见性,这将返回一行记录:

1
2
id           a                              b                             dt
1 10001 varchar00001 20230501

提交过程会在路径 /tmp/paimon/default.db/T/snapshot/snapshot-1 下创建一个快照。快照 snapshot-1 的最终文件布局如下所述:

snapshot-1

使用 tree 命令以树状格式列出当前 Catalog 下的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ tree /tmp/paimon/
/tmp/paimon/
└── default.db
└── T
├── dt=20230501
│   └── bucket-0
│   └── data-c4f3cde1-8759-4f69-abc2-da6bea4911a1-0.orc
├── manifest
│   ├── manifest-401272a3-05f2-4d51-8a3d-27ecee17ca0e-0
│   ├── manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0
│   └── manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1
├── schema
│   └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
└── snapshot-1

7 directories, 8 files

使用 vim 命令查看 snapshot-1 的内容:

1
vim /tmp/paimon/default.db/T/snapshot/snapshot-1

可以看到该快照文件内容包含快照的元数据,如清单和模式 ID:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"version" : 3,
"id" : 1,
"schemaId" : 0,
"baseManifestList" : "manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0",
"deltaManifestList" : "manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1",
"changelogManifestList" : null,
"commitUser" : "14c360cd-2141-4e0f-b4b7-a16de9c94bbf",
"commitIdentifier" : 9223372036854775807,
"commitKind" : "APPEND",
"timeMillis" : 1686362112672,
"logOffsets" : { },
"totalRecordCount" : 1,
"deltaRecordCount" : 1,
"changelogRecordCount" : 0,
"watermark" : -9223372036854775808
}

请注意,清单列表包含快照的所有更改,baseManifestList 是应用 deltaManifestList 中的更改的基础文件。第一次提交会产生 1 个清单文件,并创建 2 个清单列表(文件名称可能与你的实验中的文件名称不同):

1
2
3
4
5
ll /tmp/paimon/default.db/T/manifest
total 12K
-rw-r--r-- 1 root root 1.7K Jun 10 09:55 manifest-401272a3-05f2-4d51-8a3d-27ecee17ca0e-0
-rw-r--r-- 1 root root 676 Jun 10 09:55 manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0
-rw-r--r-- 1 root root 794 Jun 10 09:55 manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1

manifest-401272a3-05f2-4d51-8a3d-27ecee17ca0e-0 是清单文件(manifest 文件),对应上图中的 manifest-1-0,它存储了快照中数据文件的信息。

manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0baseManifestList(上图中的 manifest-list-1-base),它实际上是空的。

manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1deltaManifestList(上图中的 manifest-list-1-delta),它包含了对数据文件执行操作的清单条目列表,在本例中为 manifest-1-0

接下来,我们插入多条记录,这些记录分布在不同的分区中,让我们看看会发生什么。

在 Flink SQL 中,执行以下语句:

1
2
3
4
5
6
7
8
9
10
INSERT INTO T VALUES 
(2, 10002, 'varchar00002', '20230502'),
(3, 10003, 'varchar00003', '20230503'),
(4, 10004, 'varchar00004', '20230504'),
(5, 10005, 'varchar00005', '20230505'),
(6, 10006, 'varchar00006', '20230506'),
(7, 10007, 'varchar00007', '20230507'),
(8, 10008, 'varchar00008', '20230508'),
(9, 10009, 'varchar00009', '20230509'),
(10, 10010, 'varchar00010', '20230510');

第二次 commit 后,执行 SELECT * FROM T 将返回 10 条记录:

1
2
3
4
5
6
7
8
9
10
11
id           a                              b                             dt
1 10001 varchar00001 20230501
3 10003 varchar00003 20230503
10 10010 varchar00010 20230510
8 10008 varchar00008 20230508
4 10004 varchar00004 20230504
9 10009 varchar00009 20230509
7 10007 varchar00007 20230507
2 10002 varchar00002 20230502
5 10005 varchar00005 20230505
6 10006 varchar00006 20230506

第二次 commit 后,同样会创建一个新的快照,即 snapshot-2,使用 ls -atR 查看物理文件布局如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
$ ls -atR /tmp/paimon/default.db/T
/tmp/paimon/default.db/T:
snapshot 'dt=20230506' 'dt=20230505' 'dt=20230507' 'dt=20230504' 'dt=20230510' 'dt=20230501' ..
manifest . 'dt=20230502' 'dt=20230509' 'dt=20230508' 'dt=20230503' schema

/tmp/paimon/default.db/T/snapshot:
. LATEST snapshot-2 .. EARLIEST snapshot-1

/tmp/paimon/default.db/T/manifest:
manifest-list-98fad66b-1a62-4657-9127-39e31bd26b88-1 ..
. manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1
manifest-16b42374-154b-4aaf-8d15-2aa0ba41ade1-0 manifest-401272a3-05f2-4d51-8a3d-27ecee17ca0e-0
manifest-list-98fad66b-1a62-4657-9127-39e31bd26b88-0 manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0

'/tmp/paimon/default.db/T/dt=20230506':
bucket-0 . ..

'/tmp/paimon/default.db/T/dt=20230506/bucket-0':
data-f416780e-d673-49e3-b3e3-bbe390e9c00c-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230505':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230505/bucket-0':
data-78c70a1f-f51c-4ebd-822d-2f1f4a47ef9f-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230502':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230502/bucket-0':
data-8ccf8d2d-607b-4b9f-bf91-ef6542d41d05-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230507':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230507/bucket-0':
data-001834d2-3129-47da-b8cd-b61eb227d384-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230509':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230509/bucket-0':
data-69f3f4eb-cf80-4741-92b3-9cb396354ac2-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230504':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230504/bucket-0':
data-ae0d5e30-8178-4caa-b311-f39bbf116ed7-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230508':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230508/bucket-0':
data-83161511-dc36-421a-bba7-ce8e46c7d5f8-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230510':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230510/bucket-0':
data-71177b4a-0d97-4994-9447-c541ef61145b-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230503':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230503/bucket-0':
data-4cb7a3f9-2ad1-4f7b-9cf3-4445029244cf-0.orc . ..

'/tmp/paimon/default.db/T/dt=20230501':
.. bucket-0 .

'/tmp/paimon/default.db/T/dt=20230501/bucket-0':
data-c4f3cde1-8759-4f69-abc2-da6bea4911a1-0.orc . ..

/tmp/paimon/default.db/T/schema:
.. . schema-0

使用 tree 命令以树状格式列出当前 Catalog 下的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
$ tree /tmp/paimon/default.db
/tmp/paimon/default.db
└── T
├── dt=20230501
│   └── bucket-0
│   └── data-c4f3cde1-8759-4f69-abc2-da6bea4911a1-0.orc
├── dt=20230502
│   └── bucket-0
│   └── data-8ccf8d2d-607b-4b9f-bf91-ef6542d41d05-0.orc
├── dt=20230503
│   └── bucket-0
│   └── data-4cb7a3f9-2ad1-4f7b-9cf3-4445029244cf-0.orc
├── dt=20230504
│   └── bucket-0
│   └── data-ae0d5e30-8178-4caa-b311-f39bbf116ed7-0.orc
├── dt=20230505
│   └── bucket-0
│   └── data-78c70a1f-f51c-4ebd-822d-2f1f4a47ef9f-0.orc
├── dt=20230506
│   └── bucket-0
│   └── data-f416780e-d673-49e3-b3e3-bbe390e9c00c-0.orc
├── dt=20230507
│   └── bucket-0
│   └── data-001834d2-3129-47da-b8cd-b61eb227d384-0.orc
├── dt=20230508
│   └── bucket-0
│   └── data-83161511-dc36-421a-bba7-ce8e46c7d5f8-0.orc
├── dt=20230509
│   └── bucket-0
│   └── data-69f3f4eb-cf80-4741-92b3-9cb396354ac2-0.orc
├── dt=20230510
│   └── bucket-0
│   └── data-71177b4a-0d97-4994-9447-c541ef61145b-0.orc
├── manifest
│   ├── manifest-16b42374-154b-4aaf-8d15-2aa0ba41ade1-0
│   ├── manifest-401272a3-05f2-4d51-8a3d-27ecee17ca0e-0
│   ├── manifest-list-98fad66b-1a62-4657-9127-39e31bd26b88-0
│   ├── manifest-list-98fad66b-1a62-4657-9127-39e31bd26b88-1
│   ├── manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0
│   └── manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1
├── schema
│   └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
└── snapshot-2

24 directories, 21 files

从 snapshot-2 开始,新的文件布局看起来像

snapshot-2

从表中删除记录

现在删除满足条件 dt>=20230503 的记录。使用以下命令删除表 T 中 dt>=20230503 的记录:

1
./bin/flink run ./lib/paimon-flink-action-0.4.0-incubating.jar delete --path file:///tmp/paimon/default.db/T --where "dt >= '20230503'"

执行过程输出的信息如下:

1
2
3
4
5
6
7
./bin/flink run ./lib/paimon-flink-action-0.4.0-incubating.jar delete --path file:///tmp/paimon/default.db/T --where "dt >= '20230503'"
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/mnt/d/opt/flink-1.17.1/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID 1d3b1cb3c227ca01ba8559b38885b3a8

以上命令的执行结果,等价于以下 SQL:

1
DELETE FROM T WHERE dt >= '20230503';

笔者当前演示这些操作的时候,Paimon 0.4 尚不支持在 Flink SQL 中直接执行 DELETE FROM T WHERE dt >= '20230503'; 语句进行删除,报错信息如下:

1
2
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Can't perform delete operation of the table paimon.default.T because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete.

具体执行 SQL 的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Flink SQL> CREATE CATALOG paimon WITH (
> 'type' = 'paimon',
> 'warehouse' = 'file:///tmp/paimon'
> );
>
>
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG paimon;
[INFO] Execute statement succeed.

Flink SQL> SELECT * FROM T;
[INFO] Result retrieval cancelled.

Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Execute statement succeed.

Flink SQL> DELETE FROM T WHERE dt >= '20230503';
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Can't perform delete operation of the table paimon.default.T because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete.

Flink SQL>

因此,这里使用客户端命令进行删除操作。

另外,还要注意的点是,命令中引用的是 paimon-flink-action-0.4.0-incubating.jar,而不是 paimon-flink-1.17-0.4.0-incubating.jar,请不要混淆了。

关于 delete 操作的更多信息,可以通过以下命令详细了解:

1
<FLINK_HOME>/bin/flink run /path/to/paimon-flink-action-{{< version >}}.jar delete --help

如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
./bin/flink run ./lib/paimon-flink-action-0.4.jar delete --help
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/mnt/d/opt/flink-1.17.1/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Action "delete" deletes data from a table.

Syntax:
delete --warehouse <warehouse-path> --database <database-name> --table <table-name> --where <filter_spec>
delete --path <table-path> --where <filter_spec>

The '--where <filter_spec>' part is equal to the 'WHERE' clause in SQL DELETE statement. If you want delete all records, please use overwrite (see doc).

Examples:
delete --path hdfs:///path/to/warehouse/test_db.db/test_table --where id > (SELECT count(*) FROM employee)
It's equal to 'DELETE FROM test_table WHERE id > (SELECT count(*) FROM employee)

现在继续探讨删除操作对的表文件产生的影响。

第三次 commit 后,创建了一个新的快照 snapshot-3。继续使用 tree 命令来列出表下的文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/tmp/paimon
└── default.db
└── T
├── dt=20230501
│   └── bucket-0
│   └── data-c4f3cde1-8759-4f69-abc2-da6bea4911a1-0.orc
├── dt=20230502
│   └── bucket-0
│   └── data-8ccf8d2d-607b-4b9f-bf91-ef6542d41d05-0.orc
├── dt=20230503
│   └── bucket-0
│   ├── data-4cb7a3f9-2ad1-4f7b-9cf3-4445029244cf-0.orc
│   └── data-d6cccfda-45b7-4640-8ca9-5455be7dfdc0-0.orc
├── dt=20230504
│   └── bucket-0
│   ├── data-ae0d5e30-8178-4caa-b311-f39bbf116ed7-0.orc
│   └── data-ffaa7f9f-a112-4da0-9117-630fa02354f8-0.orc
├── dt=20230505
│   └── bucket-0
│   ├── data-78c70a1f-f51c-4ebd-822d-2f1f4a47ef9f-0.orc
│   └── data-7bc0e30b-7509-45ca-a191-3ef30a173af0-0.orc
├── dt=20230506
│   └── bucket-0
│   ├── data-95631ff5-7c56-4600-9e7a-1e3976016d28-0.orc
│   └── data-f416780e-d673-49e3-b3e3-bbe390e9c00c-0.orc
├── dt=20230507
│   └── bucket-0
│   ├── data-001834d2-3129-47da-b8cd-b61eb227d384-0.orc
│   └── data-367c1275-e6d7-4a94-b8b6-9edc6a84d57d-0.orc
├── dt=20230508
│   └── bucket-0
│   ├── data-83161511-dc36-421a-bba7-ce8e46c7d5f8-0.orc
│   └── data-bb095dcc-3c51-4a29-b408-3da9a325c47a-0.orc
├── dt=20230509
│   └── bucket-0
│   ├── data-220babec-5065-4e24-b30d-d46d060a6e7a-0.orc
│   └── data-69f3f4eb-cf80-4741-92b3-9cb396354ac2-0.orc
├── dt=20230510
│   └── bucket-0
│   ├── data-073c53a1-899e-416f-b43e-3433a541057b-0.orc
│   └── data-71177b4a-0d97-4994-9447-c541ef61145b-0.orc
├── manifest
│   ├── manifest-16b42374-154b-4aaf-8d15-2aa0ba41ade1-0
│   ├── manifest-401272a3-05f2-4d51-8a3d-27ecee17ca0e-0
│   ├── manifest-9ed893d6-4f96-4f5a-bc9b-3f5da726a8d1-0
│   ├── manifest-list-28bc3d79-2688-4c70-b610-8fd359901eec-0
│   ├── manifest-list-28bc3d79-2688-4c70-b610-8fd359901eec-1
│   ├── manifest-list-98fad66b-1a62-4657-9127-39e31bd26b88-0
│   ├── manifest-list-98fad66b-1a62-4657-9127-39e31bd26b88-1
│   ├── manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-0
│   └── manifest-list-c996ae59-9dc5-4d67-b691-7f1c20d71526-1
├── schema
│   └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
├── snapshot-1
├── snapshot-2
└── snapshot-3

25 directories, 33 files

你会发现,从分区 dt=20230503 到分区 dt=20230510 都没有被删除。相反,在这些分区里创建这了一个新的数据文件,查看分区 dt=20230510 下的信息:

1
2
3
4
ll /tmp/paimon/default.db/T/dt=20230510/bucket-0
total 8.0K
-rw-r--r-- 1 root root 815 Jun 11 10:32 data-073c53a1-899e-416f-b43e-3433a541057b-0.orc
-rwxrwxrwx 1 root root 801 Jun 10 10:29 data-71177b4a-0d97-4994-9447-c541ef61145b-0.orc

这是可以理解的,因为我们在第二次 commit 时插入了一条记录(用 +I[10, 10010, 'varchar00010', '20230510'] 表示),然后在第三次 commit 时删除了这条记录。

执行 SELECT * FROM T 将返回 2 条记录,即:

1
2
3
id           a                              b                             dt
1 10001 varchar00001 20230501
2 10002 varchar00002 20230502

从 snapshot-3 开始,新的文件布局看起来类似下图:

snapshot-3

请注意,manifest-3-0 包含 8 个 ADD 操作类型的 manifest 条目,对应于 8 个新写入的数据文件。

压缩表

你可能已经注意到,小文件的数量会在连续的快照中增加,这可能会导致读取性能下降。因此,为了减少小文件的数量,需要进行完全压缩。

现在我们触发完全压缩。确保你已经将执行模式设置为批处理(在 flink-conf.yaml 中添加一个条目 execution.runtime-mode: batch,或在执行命令时添加参数 -Dexecution.runtime-mode=BATCH),并通过 flink run 运行一个专门的压缩作业:

1
2
3
4
5
6
7
8
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-0.4.0-incubating.jar \
compact \
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--partition <partition-name>] \
[--catalog-conf <paimon-catalog-conf>[--catalog-conf <paimon-catalog-conf> ...]] \

本文这里的执行命令是(当前已经在 Flink 的主目录中):

1
./bin/flink run -Dexecution.runtime-mode=BATCH ./lib/paimon-flink-action-0.4.0-incubating.jar compact --path file:///tmp/paimon/default.db/T

所有当前的表文件将被压缩,并产生一个新的快照,即 snapshot-4,使用 vim 命令打开文件:

1
vim /tmp/paimon/default.db/T/snapshot/snapshot-4

snapshot-4 包含以下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"version" : 3,
"id" : 4,
"schemaId" : 0,
"baseManifestList" : "manifest-list-7456ab8e-bc9e-4d9e-853f-cca13a8afc7a-0",
"deltaManifestList" : "manifest-list-7456ab8e-bc9e-4d9e-853f-cca13a8afc7a-1",
"changelogManifestList" : null,
"commitUser" : "1541550f-4635-4a8f-a44a-b8e067249476",
"commitIdentifier" : 9223372036854775807,
"commitKind" : "COMPACT",
"timeMillis" : 1686367786486,
"logOffsets" : { },
"totalRecordCount" : 38,
"deltaRecordCount" : 20,
"changelogRecordCount" : 0,
"watermark" : -9223372036854775808
}

从 snapshot-4 开始,新的文件布局看起来如下

snapshot-4

请注意,manifest-4-0 包含 20 个清单条目(18 个 DELETE 操作和 2 个 ADD 操作)。

  1. 对于分区 dt=20230503 至分区 dt=20230510,对两个数据文件进行两次 DELETE 操作;
  2. 对于分区 dt=20230501 至分区 dt=20230502,对同一数据文件进行一次 DELETE 操作和一次 ADD 操作。

更改表

在 Flink SQL 客户端执行下面的语句来配置完全压缩:

1
ALTER TABLE T SET ('full-compaction.delta-commits' = '1');

执行上述语句会为 Paimon 表创建一个新的模式,即 schema-1。但是,在下次 commit 之前,新创建的模式 schema-1 实际上还没有被使用。

快照过期

注意,只有在快照过期且没有消费者依赖快照时,标记的数据文件才会被真正删除。欲了解更多信息,请参见快照过期

在快照过期过程中,首先确定快照的范围,然后将这些快照内的数据文件标记为删除。仅当存在引用该特定数据文件的类型为 DELETE 的清单条目时,数据文件才被标记为删除。此标记可确保该文件不会被后续快照使用,可以安全地删除。

假设上图中的 4 个快照都要过期了。过期的流程如下:

  1. 首先删除所有标记的数据文件,并记录任何更改的桶。
  2. 然后删除任何更新日志文件和关联的清单文件。
  3. 最后,删除快照本身、写入最早的提示文件。

如果在删除过程中,有任何目录是空的,它们也会被删除。

假设创建了另一个快照,即 snapshot-5,并且触发了快照过期。snapshot-1snapshot-4 将被删除。为简单起见,我们只关注前面的快照中的文件,快照过期后的最终布局如下:

snapshot-5

结果,分区 2023050320230510 被物理删除。

Flink流写入

最后,我们将通过使用 CDC 摄取的例子来研究 Flink 流写入。本节将讨论捕获和写入变化数据到 Paimon 的方法,以及异步压缩和快照提交和过期背后的机制。

首先,让我们仔细研究 CDC 数据摄取的工作流程,以及每个相关组件所扮演的独特角色。

Untitled

  1. MySQL CDC Source 统一读取快照数据和增量数据,其中 SnapshotReader 读取快照数据,BinlogReader 读取增量数据。
  2. Paimon Sink 将数据写入 Paimon 表中,并在桶级别进行归类。其中的 CompactManager 会异步触发数据压缩。
  3. Committer Operator 是一个单例(意味着整个流程中只有一个这样的组件),负责提交快照数据和删除过期的快照数据。

接下来,我们介绍一下端到端的数据流。

Untitled

MySQL Cdc Source 读取快照和增量数据,在归一化(规范化)后将其发出到下游。

Untitled

Paimon Sink首先在基于堆的LSM树中缓冲新的记录,并在内存缓冲区满时将其刷入磁盘。请注意,每写一个数据文件都是一个排序的运行。在这一点上,没有创建清单文件和快照。就在Flink检查点发生之前,Paimon Sink将刷新所有缓冲区的记录,并向下游发送可提交的消息,在检查点期间由Committer Operator读取并提交。

Paimon Sink 首先在基于堆的 LSM 树中缓冲新的记录,当内存缓冲区满时将其刷新到磁盘。请注意,每个写入的数据文件都是排序段(sorted run)。在这一点上,没有创建清单文件和快照。就在 Flink 检查点发生之前,Paimon Sink 将刷新所有缓冲区的记录,并向下游发送可提交的消息,该消息由 Committer Operator 在检查点期间读取和提交。

Untitled

在检查点期间,Committer Operator 将创建一个新快照,并将其与清单列表关联,以便该快照包含表内所有数据文件的信息。

Untitled

之后可能会发生异步压缩,并且 CompactManager 产生的可提交内容包含有关以前文件和合并文件的数据信息,以便 Committer Operator 可以构建相应的清单条目。在这种情况下,Committer Operator 可能在 Flink 检查点期间产生两个快照,一个用于写入的数据(类型为 Append 的快照),另一个用于压缩(类型为 Compact 的快照)。如果在检查点区间内没有写入数据文件,则仅创建类型为 Compact 的快照。Committer Operator 将检查快照的过期情况,并对标记的数据文件进行物理删除。

(END)