如果一个表没有定义主键,那么它默认是一个 append-only 表。

你只能向表中插入完整的记录。不支持删除或更新,也不能定义主键。这种类型的表适用于不需要更新的用例(例如日志数据同步)。

分桶(Bucketing)

你也可以为 Append-only 表定义桶的数量,见 Bucket。

建议设置 bucket-key 字段。否则,数据会按照整行进行散列,性能会很差。

压缩(Compaction

默认情况下,接收节点(sink 节点)将自动进行压缩以控制文件数量。以下选项控制压缩策略:

Key 默认值 类型 描述
write-only false Boolean 如果设置为 true,压缩和快照过期将被跳过。此选项与专门的压缩作业一起使用。
compaction.min.file-num 5 Integer 对于文件集 [f_0,…,f_N],满足 sum(size(f_i)) >= targetFileSize 的最小文件数,以触发仅追加表的压缩操作。此值可避免压缩几乎完整的文件,这不符合成本效益。
compaction.max.file-num 50 Integer 对于文件集 [f_0,…,f_N],触发仅追加表压缩的最大文件数,即使 sum(size(f_i)) < targetFileSize。该值可避免保留过多的小文件,从而降低性能。
full-compaction.delta-commits (none) Integer 全量压缩将在增量提交后不断触发。

流源(Streaming Source)

目前只有 Flink 引擎支持流源行为。

流式读取顺序

对于流式读取,记录的生成顺序如下:

  • 对于来自两个不同分区的任意两条记录:
    • 如果 scan.plan-sort-partition 设置为 true,则先生成分区值较小的记录。
    • 否则,先生成分区创建时间较早的记录。
  • 对于来自同一分区和同一桶的任意两条记录,先写入的记录先产生。
  • 对于来自同一分区但两个不同桶的任意两条记录,不同的桶由不同的任务处理,它们之间没有顺序保证。

水印定义

可以定义用于读取 Paimon 表的水印:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE T (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

也可以启用 Flink 水印对齐机制,这可以确保不会有 sources/splits/shards/partitions 将其水印提前太远:

Key 默认值 类型 描述
scan.watermark.alignment.group (none) String 一组用于对齐水印的源
scan.watermark.alignment.max-drift (none) Duration 在暂停消费 source/task/partition 之前用于对齐水印的最大偏差。

有界流(Bounded Stream)

流源也可以是有界的,可以指定“scan.bounded.watermark”来定义有界流模式的结束条件,流读取将一直持续到遇到一个更大的快照水印为止。

快照中的水印是由写入器生成的,例如,可以指定一个 Kafka 源和声明水印的定义。当使用此 Kafka 源向 Paimon 表写入数据时,Paimon 表的快照会生成相应的水印,这样就可以在流式读取该 Paimon 表时使用有界水印的功能。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE kafka_table (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;

-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

示例

下面是一个创建 Append-Only 表并指定桶键的例子。

1
2
3
4
5
6
7
8
CREATE TABLE MyTable (
product_id BIGINT,
price DOUBLE,
sales BIGINT
) WITH (
'bucket' = '8',
'bucket-key' = 'product_id'
);

(END)