Changelog 表是创建表时的默认表类型。用户可以插入、更新或删除表中的记录。

主键由一列或多列组成,其值能唯一地标识表中的每一行记录。Paimon 强制对数据进行排序,这意味着系统将对每个桶内的主键进行排序。利用这个特性,用户可以通过在主键上添加过滤条件来实现高性能。

通过在变更日志表上定义主键,用户可以获得以下功能。

合并引擎

当 Paimon 接收器(sink)收到具有相同主键的两条或更多记录时,它将把这些记录合并成一条记录以保持主键的唯一性。通过指定 merge-engine 表属性,用户可以选择如何合并记录。

在 Flink SQL TableConfig 中始终将 table.exec.sink.upsert-materialize 设置为NONE,sink upsert-materialize 可能会导致奇怪的行为。当输入乱序时,建议使用 Sequence Field 进行乱序校正。

去重(Deduplicate)

deduplicate 合并引擎是默认的合并引擎。Paimon 只会保留最新的记录,并丢弃具有相同主键的其他记录。

具体来说,如果最新的记录是一条 DELETE 记录,那么所有主键相同的记录都会被删除。

部分更新(Partial Update)

通过指定 'merge-engine' = 'partial-update',用户可以在多次更新中设置记录的列,并最终获得完整的记录。具体来说,相同主键下的各个值字段会一个接一个地更新为最新数据,但 null 值不会被覆盖。

例如,假设 Paimon 收到三条记录:

  • <1, 23.0, 10, NULL>
  • <1, NULL, NULL, 'This is a book'>
  • <1, 25.2, NULL, NULL>

如果第一列是主键,最终结果将是 <1, 25.2, 10, 'This is a book'>

对于流处理查询,partial-update 合并引擎必须与 lookupfull-compaction 一起使用。

Partial 不能接收 DELETE 消息,因为无法定义该行为。可以配置partial-update.ignore-delete 以忽略 DELETE 消息。

聚合(Aggregation)

注意:在 Flink SQL TableConfig 中始终将 table.exec.sink.upsert-materialize 设置为 NONE

有时候用户只关心聚合的结果。aggregation 合并引擎根据聚合函数,在同一个主键下,将每个值域与最新数据逐一聚合。

每个不属于主键的字段都可以被赋予一个聚合函数,由 fields.<field-name>.aggregate-function 表属性指定,否则它将使用 last_non_null_value 聚合作为默认值。

例如,考虑下表定义:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);

字段 price 通过 max 函数进行聚合,字段 sales 通过 sum 函数进行聚合。如果给定两条输入记录 <1, 23.0, 15><1, 30.2, 20>,那么最终结果为 <1, 30.2, 35>

当前支持的聚合函数及其数据类型如下:

  • sum:支持 DECIMALTINYINTSMALLINTINTEGERBIGINTFLOATDOUBLE 等数据类型。
  • min/max:支持 DECIMALTINYINTSMALLINTINTEGERBIGINTFLOATDOUBLEDATETIMETIMESTAMPTIMESTAMP_LTZ 等数据类型。
  • last_value / last_non_null_value:支持所有的数据类型。
  • listagg:支持 STRING 数据类型。
  • bool_and / bool_or:支持 BOOLEAN 数据类型。

只有 sum 支持撤回(UPDATE_BEFOREDELETE),其他聚合函数不支持撤回。如果允许某些函数忽略撤回消息,可以配置:'fields.${field_name}.ignore-retract'='true'

对于流处理查询,aggregation 合并引擎必须与 lookupfull-compaction 一起使用。

变更日志生成器(Changelog Producers)

流式查询会持续生成最新变更。这些变更可以来自底层表文件或外部日志系统(如Kafka)。与外部日志系统相比,表文件中的变更成本更低,但延迟更高(取决于快照创建的频率)。

通过在创建表时指定 changelog-producer 表属性,用户可以选择从文件生成的变更模式。

changelog-producer 表属性仅影响来自文件中的变更日志。它不会影响外部日志系统。

None

默认情况下,不会将额外的变更日志生成器应用于表的写入器。Paimon 源只能在快照之间看到合并的变更,比如删除的键、某些键的新值是什么。

但是,这些合并的变更不能形成完整的变更日志,因为我们不能直接从中读取键的旧值。合并的变更要求消费者“记住”每个键的值,并在不看到旧值的情况下重写值。然而,某些消费者需要旧值来确保正确性或效率。

考虑一个按分组键(可能不等于主键)计算总和的消费者。如果消费者只看到新值“5”,它无法确定应添加到求和结果的值。例如,如果旧值是“4”,应将“1”添加到结果中。但是如果旧值是“6”,则应从结果中减去“1”。对这些类型的消费者来说,旧值非常重要。

总而言之,none 变更日志生成器最适合数据库系统等消费者。Flink 也有一个内置的“normalize”运算符,它将每个键的值持久化在状态中。很明显,这个运算符的成本非常高,应避免使用。

Untitled

Input

通过指定 'changelog-producer' = 'input',Paimon 写入器将其输入作为完整变更日志的来源。所有输入记录将保存在单独的变更日志文件中,并由 Paimon 源提供给消费者。

当 Paimon 写入器的输入是完整的变更日志(例如来自数据库 CDC 或由 Flink 有状态计算生成)时,可以使用 input 变更日志生成器。

Untitled

Lookup

这是一个试验性的功能。

如果输入无法产生完整的变更日志,但仍想消除昂贵的标准化操作符,则可以考虑使用 'lookup' 变更日志生成器。

通过指定 'changelog-producer' = 'lookup',Paimon 会在提交数据写入之前通过 'lookup' 生成变更日志。

Untitled

Lookup 会将数据缓存在内存和本地磁盘上,可以使用以下选项来调整性能:

选项 默认值 类型 描述
lookup.cache-file-retention 1 h 时长 缓存文件的保留时长。文件过期后,如果有访问需求,会从 DFS 中重新读取以在本地磁盘上构建索引。
lookup.cache-max-disk-size unlimited 内存大小 缓存的最大磁盘大小,可以使用此选项来限制本地磁盘的使用。
lookup.cache-max-memory-size 256 mb 内存大小 缓存的最大内存大小。

Full Compaction

如果觉得 'lookup' 的资源消耗太大,可以考虑使用 'full-compaction' 变更日志生成器,它可以解耦数据写入和变更日志生成,更适合具有高延迟(比如 10 分钟)的场景。

通过指定 'changelog-producer' = 'full-compaction',Paimon 将比较完全压缩之间的结果并将差异产生为变更日志(changelog)。变更日志的延迟受完全压缩的频率的影响。

通过指定 full-compaction.delta-commits 表属性,将在增量提交(检查点)之后不断触发完全压缩。这个属性默认设置为 1,所以每个检查点都会有一个完整的压缩,并产生一个变更日志。

Untitled

完全压缩变更日志生成器可以为任何类型的源产生完整的变更日志。但是,与 Input 变更日志生成器相比,它的效率较低,且产生变更日志的延迟可能较高。

序列字段(Sequence Field)

默认情况下,主键表根据输入顺序确定合并顺序(最后一条输入记录将是最后一个合并的)。然而,在分布式计算中,会有一些导致数据无序的情况。此时,可以使用一个时间字段作为 sequence.field,比如说:

当记录被更新或删除时,sequence.field 必须变大而不能保持不变。例如,可以使用Mysql Binlog 操作时间作为 sequence.field

1
2
3
4
5
6
7
8
CREATE TABLE MyTable (
pk BIGINT PRIMARY KEY NOT ENFORCED,
v1 DOUBLE,
v2 BIGINT,
dt TIMESTAMP
) WITH (
'sequence.field' = 'dt'
);

具有最大 sequence.field 值的记录将最后合并,而不考虑输入顺序。

(END)