我是 Flink 初学者,现在我要在 Flink 应用程序中添加支持使用 Flink SQL 进行数据统计的功能,但我不知道应该添加哪些依赖。
程序使用 Java 语言开发,Flink 版本是当前最新的 1.16.1 版本,程序的功能是使用 Flink SQL 从 Kafka 读取数据,并把读取到数据直接进行标准输出。Kafka 的数据为 Canal 程序采集 MySQL 的 Binlog 日志,所以这里我使用到的 Table API 连接器有 Kafka Connector,Canal Connector。
pom.xml 依赖如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 
 | <dependencies><dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope>
 </dependency>
 
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients</artifactId>
 <version>${flink.version}</version>
 <scope>provided</scope>
 </dependency>
 
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-slf4j-impl</artifactId>
 <version>${log4j.version}</version>
 <scope>runtime</scope>
 </dependency>
 
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-api</artifactId>
 <version>${log4j.version}</version>
 <scope>runtime</scope>
 </dependency>
 
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-core</artifactId>
 <version>${log4j.version}</version>
 <scope>runtime</scope>
 </dependency>
 
 
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-json</artifactId>
 <version>${flink.version}</version>
 </dependency>
 
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-api-java</artifactId>
 <version>${flink.version}</version>
 </dependency>
 
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-loader</artifactId>
 <version>1.16.1</version>
 </dependency>
 
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-runtime</artifactId>
 <version>1.16.1</version>
 </dependency>
 
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka</artifactId>
 <version>${flink.version}</version>
 </dependency>
 </dependencies>
 
 | 
程序代码如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 | public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .inStreamingMode()
 .build();
 TableConfig tableConfig = TableConfig.getDefault();
 tableConfig.setIdleStateRetention(Duration.ofDays(1L));
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 tableEnv.executeSql("" +
 "CREATE TABLE `tb_user` ( " +
 "  `id` INT, " +
 "  `area_code` STRING, " +
 "  `phone` STRING, " +
 "  `password` STRING, " +
 "  `nickname` STRING, " +
 "  `avatar` STRING " +
 ") WITH ( " +
 " 'connector' = 'kafka', " +
 " 'topic' = 'dwd_user', " +
 " 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', " +
 " 'properties.group.id' = 'dwd_user_v230215', " +
 " 'scan.startup.mode' = 'earliest-offset', " +
 " 'format' = 'canal-json' " +
 ")"
 );
 
 Table table = tableEnv.sqlQuery("select * from tb_user");
 table.printSchema();
 
 
 DataStream<Row> resultStream = tableEnv.from(table);
 
 resultStream.print();
 
 env.execute("TT");
 }
 
 | 
上面代码中,Flink SQL 所使用的 Kafka 连接器和解析 Canal Json 格式的连接器的依赖是对的。
Flink Table API 的 Kafka 连接器依赖:
| 12
 3
 4
 5
 6
 
 |     <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka</artifactId>
 <version>${flink.version}</version>
 </dependency>
 
 | 
Flink Table API 的 Canal Json 解析连接器依赖:
| 12
 3
 4
 5
 
 | <dependency><groupId>org.apache.flink</groupId>
 <artifactId>flink-json</artifactId>
 <version>${flink.version}</version>
 </dependency>
 
 | 
因为你的应用中需要将 Table 类型转为 DataStream 类型进行打印输出,即需要混合使用 DataStream API 和 Table API & SQL 这两种类型 API,那么你需要添加的是 flink-table-api-java-bridge 依赖,而不是 flink-table-api-java 依赖。
flink-table-api-java-bridge 依赖如下:
| 12
 3
 4
 5
 
 | <dependency><groupId>org.apache.flink</groupId>
 <artifactId>flink-table-api-java-bridge</artifactId>
 <version>1.16.1</version>
 </dependency>
 
 | 
修改完依赖后,你的应用程序代码修改如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 
 | public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .inStreamingMode()
 .build();
 TableConfig tableConfig = TableConfig.getDefault();
 tableConfig.setIdleStateRetention(Duration.ofDays(1L));
 **StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, settings);**
 
 tableEnv.executeSql("" +
 "CREATE TABLE `tb_user` ( " +
 "  `id` INT, " +
 "  `area_code` STRING, " +
 "  `phone` STRING, " +
 "  `password` STRING, " +
 "  `nickname` STRING, " +
 "  `avatar` STRING " +
 ") WITH ( " +
 " 'connector' = 'kafka', " +
 " 'topic' = 'dwd_user', " +
 " 'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092', " +
 " 'properties.group.id' = 'dwd_user_v230215', " +
 " 'scan.startup.mode' = 'earliest-offset', " +
 " 'format' = 'canal-json' " +
 ")"
 );
 
 Table table = tableEnv.sqlQuery("select * from tb_user");
 table.printSchema();
 
 **DataStream<Row> resultStream = tableEnv.toChangelogStream(table);**
 
 resultStream.print();
 
 env.execute("TT");
 }
 
 | 
NOTE:Flink 提供了两种主要的 API:Datastream API 和 Table API & SQL。根据你的使用场景,它们既可以单独使用,也可以混合使用。
API 与依赖项对应如下:
| 要使用的 API | 需要添加的依赖项 | 
| DataStream | flink-streaming-java | 
| DataStream with Scala | flink-streaming-scala_2.12 | 
| Table API | flink-table-api-java | 
| Table API with Scala | flink-table-api-scala_2.12 | 
| Table API + DataStream | flink-table-api-java-bridge | 
| Table API + DataStream with Scala | flink-table-api-scala-bridge_2.12 | 
 
这里 API 按 Java 还是 Scala 语言,是单独使用还是混合使用,划分为六个依赖项,具体使用哪个 API,取决于你的应用场景。
(END)