Apache Paimon | 主键表
Changelog 表是创建表时的默认表类型。用户可以插入、更新或删除表中的记录。
主键由一列或多列组成,其值能唯一地标识表中的每一行记录。Paimon 强制对数据进行排序,这意味着系统将对每个桶内的主键进行排序。利用这个特性,用户可以通过在主键上添加过滤条件来实现高性能。
通过在变更日志表上定义主键,用户可以获得以下功能。
合并引擎
当 Paimon 接收器(sink)收到具有相同主键的两条或更多记录时,它将把这些记录合并成一条记录以保持主键的唯一性。通过指定 merge-engine
表属性,用户可以选择如何合并记录。
在 Flink SQL TableConfig 中始终将
table.exec.sink.upsert-materialize
设置为NONE
,sink upsert-materialize 可能会导致奇怪的行为。当输入乱序时,建议使用 Sequence Field 进行乱序校正。
去重(Deduplicate)
deduplicate
合并引擎是默认的合并引擎。Paimon 只会保留最新的记录,并丢弃具有相同主键的其他记录。
具体来说,如果最新的记录是一条 DELETE
记录,那么所有主键相同的记录都会被删除。
部分更新(Partial Update)
通过指定 'merge-engine' = 'partial-update'
,用户可以在多次更新中设置记录的列,并最终获得完整的记录。具体来说,相同主键下的各个值字段会一个接一个地更新为最新数据,但 null
值不会被覆盖。
例如,假设 Paimon 收到三条记录:
<1, 23.0, 10, NULL>
<1, NULL, NULL, 'This is a book'>
<1, 25.2, NULL, NULL>
如果第一列是主键,最终结果将是 <1, 25.2, 10, 'This is a book'>
。
对于流处理查询,
partial-update
合并引擎必须与lookup
或full-compaction
一起使用。Partial 不能接收
DELETE
消息,因为无法定义该行为。可以配置partial-update.ignore-delete
以忽略DELETE
消息。
聚合(Aggregation)
注意:在 Flink SQL TableConfig 中始终将
table.exec.sink.upsert-materialize
设置为NONE
。
有时候用户只关心聚合的结果。aggregation
合并引擎根据聚合函数,在同一个主键下,将每个值域与最新数据逐一聚合。
每个不属于主键的字段都可以被赋予一个聚合函数,由 fields.<field-name>.aggregate-function
表属性指定,否则它将使用 last_non_null_value
聚合作为默认值。
例如,考虑下表定义:
1 | CREATE TABLE MyTable ( |
字段 price
通过 max
函数进行聚合,字段 sales
通过 sum
函数进行聚合。如果给定两条输入记录 <1, 23.0, 15>
和 <1, 30.2, 20>
,那么最终结果为 <1, 30.2, 35>
。
当前支持的聚合函数及其数据类型如下:
sum
:支持DECIMAL
、TINYINT
、SMALLINT
、INTEGER
、BIGINT
、FLOAT
和DOUBLE
等数据类型。min/max
:支持DECIMAL
、TINYINT
、SMALLINT
、INTEGER
、BIGINT
、FLOAT
、DOUBLE
、DATE
、TIME
、TIMESTAMP
和TIMESTAMP_LTZ
等数据类型。last_value / last_non_null_value
:支持所有的数据类型。listagg
:支持STRING
数据类型。bool_and / bool_or
:支持BOOLEAN
数据类型。
只有 sum
支持撤回(UPDATE_BEFORE
和 DELETE
),其他聚合函数不支持撤回。如果允许某些函数忽略撤回消息,可以配置:'fields.${field_name}.ignore-retract'='true'
。
对于流处理查询,
aggregation
合并引擎必须与lookup
或full-compaction
一起使用。
变更日志生成器(Changelog Producers)
流式查询会持续生成最新变更。这些变更可以来自底层表文件或外部日志系统(如Kafka)。与外部日志系统相比,表文件中的变更成本更低,但延迟更高(取决于快照创建的频率)。
通过在创建表时指定 changelog-producer
表属性,用户可以选择从文件生成的变更模式。
changelog-producer
表属性仅影响来自文件中的变更日志。它不会影响外部日志系统。
None
默认情况下,不会将额外的变更日志生成器应用于表的写入器。Paimon 源只能在快照之间看到合并的变更,比如删除的键、某些键的新值是什么。
但是,这些合并的变更不能形成完整的变更日志,因为我们不能直接从中读取键的旧值。合并的变更要求消费者“记住”每个键的值,并在不看到旧值的情况下重写值。然而,某些消费者需要旧值来确保正确性或效率。
考虑一个按分组键(可能不等于主键)计算总和的消费者。如果消费者只看到新值“5”,它无法确定应添加到求和结果的值。例如,如果旧值是“4”,应将“1”添加到结果中。但是如果旧值是“6”,则应从结果中减去“1”。对这些类型的消费者来说,旧值非常重要。
总而言之,none
变更日志生成器最适合数据库系统等消费者。Flink 也有一个内置的“normalize”运算符,它将每个键的值持久化在状态中。很明显,这个运算符的成本非常高,应避免使用。
Input
通过指定 'changelog-producer' = 'input'
,Paimon 写入器将其输入作为完整变更日志的来源。所有输入记录将保存在单独的变更日志文件中,并由 Paimon 源提供给消费者。
当 Paimon 写入器的输入是完整的变更日志(例如来自数据库 CDC 或由 Flink 有状态计算生成)时,可以使用 input
变更日志生成器。
Lookup
这是一个试验性的功能。
如果输入无法产生完整的变更日志,但仍想消除昂贵的标准化操作符,则可以考虑使用 'lookup'
变更日志生成器。
通过指定 'changelog-producer' = 'lookup'
,Paimon 会在提交数据写入之前通过 'lookup'
生成变更日志。
Lookup 会将数据缓存在内存和本地磁盘上,可以使用以下选项来调整性能:
选项 | 默认值 | 类型 | 描述 |
---|---|---|---|
lookup.cache-file-retention | 1 h | 时长 | 缓存文件的保留时长。文件过期后,如果有访问需求,会从 DFS 中重新读取以在本地磁盘上构建索引。 |
lookup.cache-max-disk-size | unlimited | 内存大小 | 缓存的最大磁盘大小,可以使用此选项来限制本地磁盘的使用。 |
lookup.cache-max-memory-size | 256 mb | 内存大小 | 缓存的最大内存大小。 |
Full Compaction
如果觉得 'lookup'
的资源消耗太大,可以考虑使用 'full-compaction'
变更日志生成器,它可以解耦数据写入和变更日志生成,更适合具有高延迟(比如 10 分钟)的场景。
通过指定 'changelog-producer' = 'full-compaction'
,Paimon 将比较完全压缩之间的结果并将差异产生为变更日志(changelog)。变更日志的延迟受完全压缩的频率的影响。
通过指定 full-compaction.delta-commits
表属性,将在增量提交(检查点)之后不断触发完全压缩。这个属性默认设置为 1,所以每个检查点都会有一个完整的压缩,并产生一个变更日志。
完全压缩变更日志生成器可以为任何类型的源产生完整的变更日志。但是,与
Input
变更日志生成器相比,它的效率较低,且产生变更日志的延迟可能较高。
序列字段(Sequence Field)
默认情况下,主键表根据输入顺序确定合并顺序(最后一条输入记录将是最后一个合并的)。然而,在分布式计算中,会有一些导致数据无序的情况。此时,可以使用一个时间字段作为 sequence.field
,比如说:
当记录被更新或删除时,sequence.field
必须变大而不能保持不变。例如,可以使用Mysql Binlog 操作时间作为 sequence.field
。
1 | CREATE TABLE MyTable ( |
具有最大 sequence.field
值的记录将最后合并,而不考虑输入顺序。
(END)