除了底层表文件的变更日志外,Paimon 的变更日志也可以存储在外部日志系统中,比如 Kafka,或者从外部日志系统中消费。通过指定 log.system 表属性,用户可以选择使用哪个外部日志系统。

如果选择使用外部日志系统,那么所有写入表文件中的记录也会写入日志系统。因此,流查询产生的变化会从日志系统读取,而不是表文件。

一致性保证

默认情况下,日志系统中的变化要等到快照之后才对消费者可见,就像表文件一样。这种行为保证了精确一次的语义。也就是说,每条记录只被消费者看到一次。

但是,用户也可以指定表的属性 'log.consistency' = 'eventual',这样写进日志系统的变更日志就可以立即被消费者消费,而不用等待下一个快照。这种行为减少了变更日志的延迟,但由于可能发生的故障,它只能保证至少一次的语义(即,消费者可能会看到重复的记录)。

如果设置 'log.consistency' = 'eventual',为了获得正确的结果,Flink 中的 Paimon 源将自动添加“normalize”运算符进行重复数据删除。该运算符将每个键的值保存在状态中。可以很容易地看出,该运算符的成本将非常高,应该避免使用。

支持的日志系统

Kafka

准备 flink-sql-connector-kafka Jar 文件

Paimon 目前支持 Flink 1.17,1.16,1.15 和 1.14 版本。推荐使用最新的 Flink 版本以获得更好的体验。

下载相应版本的 flink-sql-connector-kafka jar 文件。

Flink 版本 Jar 包
Flink 1.17 https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar
Flink 1.16 https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.1.jar
Flink 1.15 https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar
Flink 1.14 https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar

通过指定 ‘log.system’ = ‘kafka’,用户可以将更改与表文件一起写入 Kafka。

通过指定 'log.system' = 'kafka',用户可以将变更写入 Kafka 和表文件。

1
2
3
4
5
6
CREATE TABLE T (...)
WITH (
'log.system' = 'kafka',
'kafka.bootstrap.servers' = '...',
'kafka.topic' = '...'
);

Kafka 的表属性如下所示:

Key 默认值 类型 描述
kafka.bootstrap.servers (none) String 必填,Kafka 服务器连接字符串。
kafka.topic (none) String Kafka 主题(Topic)

(END)