我是 Flink 初学者,现在我要在 Flink 应用程序中添加支持使用 Flink SQL 进行数据统计的功能,但我不知道应该添加哪些依赖。

程序使用 Java 语言开发,Flink 版本是当前最新的 1.16.1 版本,程序的功能是使用 Flink SQL 从 Kafka 读取数据,并把读取到数据直接进行标准输出。Kafka 的数据为 Canal 程序采集 MySQL 的 Binlog 日志,所以这里我使用到的 Table API 连接器有 Kafka Connector,Canal Connector。

pom.xml 依赖如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>

<!-- 以下三个依赖是我自己添加的 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>1.16.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.16.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

程序代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setIdleStateRetention(Duration.ofDays(1L));
TableEnvironment tableEnv = TableEnvironment.create(settings);

tableEnv.executeSql("" +
"CREATE TABLE `tb_user` ( " +
" `id` INT, " +
" `area_code` STRING, " +
" `phone` STRING, " +
" `password` STRING, " +
" `nickname` STRING, " +
" `avatar` STRING " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'dwd_user', " +
" 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', " +
" 'properties.group.id' = 'dwd_user_v230215', " +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'canal-json' " +
")"
);

Table table = tableEnv.sqlQuery("select * from tb_user");
table.printSchema();

// 这里要把 Table 转成流进行打印输出,但这里缺少相关方法,编译不通过?
DataStream<Row> resultStream = tableEnv.from(table);

resultStream.print();

env.execute("TT");
}

上面代码中,Flink SQL 所使用的 Kafka 连接器和解析 Canal Json 格式的连接器的依赖是对的。

Flink Table API 的 Kafka 连接器依赖:

1
2
3
4
5
6
    
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>

Flink Table API 的 Canal Json 解析连接器依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

因为你的应用中需要将 Table 类型转为 DataStream 类型进行打印输出,即需要混合使用 DataStream API 和 Table API & SQL 这两种类型 API,那么你需要添加的是 flink-table-api-java-bridge 依赖,而不是 flink-table-api-java 依赖。

flink-table-api-java-bridge 依赖如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.16.1</version>
</dependency>

修改完依赖后,你的应用程序代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setIdleStateRetention(Duration.ofDays(1L));
**StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings);**

tableEnv.executeSql("" +
"CREATE TABLE `tb_user` ( " +
" `id` INT, " +
" `area_code` STRING, " +
" `phone` STRING, " +
" `password` STRING, " +
" `nickname` STRING, " +
" `avatar` STRING " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'dwd_user', " +
" 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', " +
" 'properties.group.id' = 'dwd_user_v230215', " +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'canal-json' " +
")"
);

Table table = tableEnv.sqlQuery("select * from tb_user");
table.printSchema();

**DataStream<Row> resultStream = tableEnv.toChangelogStream(table);**

resultStream.print();

env.execute("TT");
}

NOTE:Flink 提供了两种主要的 API:Datastream API 和 Table API & SQL。根据你的使用场景,它们既可以单独使用,也可以混合使用。

API 与依赖项对应如下:

要使用的 API 需要添加的依赖项
DataStream flink-streaming-java
DataStream with Scala flink-streaming-scala_2.12
Table API flink-table-api-java
Table API with Scala flink-table-api-scala_2.12
Table API + DataStream flink-table-api-java-bridge
Table API + DataStream with Scala flink-table-api-scala-bridge_2.12

这里 API 按 Java 还是 Scala 语言,是单独使用还是混合使用,划分为六个依赖项,具体使用哪个 API,取决于你的应用场景。

(END)