我是 Flink 初学者,现在我要在 Flink 应用程序中添加支持使用 Flink SQL 进行数据统计的功能,但我不知道应该添加哪些依赖。
程序使用 Java 语言开发,Flink 版本是当前最新的 1.16.1 版本,程序的功能是使用 Flink SQL 从 Kafka 读取数据,并把读取到数据直接进行标准输出。Kafka 的数据为 Canal 程序采集 MySQL 的 Binlog 日志,所以这里我使用到的 Table API 连接器有 Kafka Connector,Canal Connector。
pom.xml 依赖如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency>
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency>
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>1.16.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>1.16.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
|
程序代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableConfig tableConfig = TableConfig.getDefault(); tableConfig.setIdleStateRetention(Duration.ofDays(1L)); TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.executeSql("" + "CREATE TABLE `tb_user` ( " + " `id` INT, " + " `area_code` STRING, " + " `phone` STRING, " + " `password` STRING, " + " `nickname` STRING, " + " `avatar` STRING " + ") WITH ( " + " 'connector' = 'kafka', " + " 'topic' = 'dwd_user', " + " 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', " + " 'properties.group.id' = 'dwd_user_v230215', " + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'canal-json' " + ")" );
Table table = tableEnv.sqlQuery("select * from tb_user"); table.printSchema();
DataStream<Row> resultStream = tableEnv.from(table);
resultStream.print();
env.execute("TT"); }
|
上面代码中,Flink SQL 所使用的 Kafka 连接器和解析 Canal Json 格式的连接器的依赖是对的。
Flink Table API 的 Kafka 连接器依赖:
1 2 3 4 5 6
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency>
|
Flink Table API 的 Canal Json 解析连接器依赖:
1 2 3 4 5
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
|
因为你的应用中需要将 Table 类型转为 DataStream 类型进行打印输出,即需要混合使用 DataStream API 和 Table API & SQL 这两种类型 API,那么你需要添加的是 flink-table-api-java-bridge 依赖,而不是 flink-table-api-java 依赖。
flink-table-api-java-bridge 依赖如下:
1 2 3 4 5
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.16.1</version> </dependency>
|
修改完依赖后,你的应用程序代码修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableConfig tableConfig = TableConfig.getDefault(); tableConfig.setIdleStateRetention(Duration.ofDays(1L)); **StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings);**
tableEnv.executeSql("" + "CREATE TABLE `tb_user` ( " + " `id` INT, " + " `area_code` STRING, " + " `phone` STRING, " + " `password` STRING, " + " `nickname` STRING, " + " `avatar` STRING " + ") WITH ( " + " 'connector' = 'kafka', " + " 'topic' = 'dwd_user', " + " 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', " + " 'properties.group.id' = 'dwd_user_v230215', " + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'canal-json' " + ")" );
Table table = tableEnv.sqlQuery("select * from tb_user"); table.printSchema();
**DataStream<Row> resultStream = tableEnv.toChangelogStream(table);**
resultStream.print();
env.execute("TT"); }
|
NOTE:Flink 提供了两种主要的 API:Datastream API 和 Table API & SQL。根据你的使用场景,它们既可以单独使用,也可以混合使用。
API 与依赖项对应如下:
要使用的 API |
需要添加的依赖项 |
DataStream |
flink-streaming-java |
DataStream with Scala |
flink-streaming-scala_2.12 |
Table API |
flink-table-api-java |
Table API with Scala |
flink-table-api-scala_2.12 |
Table API + DataStream |
flink-table-api-java-bridge |
Table API + DataStream with Scala |
flink-table-api-scala-bridge_2.12 |
这里 API 按 Java 还是 Scala 语言,是单独使用还是混合使用,划分为六个依赖项,具体使用哪个 API,取决于你的应用场景。
(END)