准备工作

  • 基于 UNIX 环境

如果使用的是 Windows 环境,可以安装虚拟机软件(如 VMware Workstation Pro)并配置一个 Linux 系统(如 CentOS 8,Ubuntu 等)的虚拟机,也可以安装 Cygwin( 一个 Windows 下的 Linux 环境),或者配置 WSL (Windows Subsystem for Linux, Windows 10 中新加的功能)。

这里运行环境为 Windows 10 的 Linux 子系统:

1
2
3
4
➜  /opt cat /proc/version
Linux version 4.4.0-19041-Microsoft (Microsoft@Microsoft.com) (gcc version 5.4.0 (GCC) ) #488-Microsoft Mon Sep 01 13:43:00 PST 2020
➜ /opt cat /etc/issue
Ubuntu 20.04.4 LTS \n \l
  • 安装 Java 8
1
2
3
4
➜  /opt java -version
java version "1.8.0_311"
Java(TM) SE Runtime Environment (build 1.8.0_311-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.311-b11, mixed mode)
  • 安装 Maven
1
2
3
4
5
6
➜  /opt mvn --version
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /mnt/d/opt/maven
Java version: 1.8.0_311, vendor: Oracle Corporation, runtime: /mnt/d/opt/java/jdk1.8.0_311/jre
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-19041-microsoft", arch: "amd64", family: "unix"
  1. 从 Apache Flink 官网(https://flink.apache.org/)下载基于 Scala 2.12 的 Apache Flink 1.15.2 二进制发行版。
    1
    wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
  2. 解压文件:
    1
    tar -xzf flink-1.15.2-bin-scala_2.12.tgz
  3. 进入 Flink 目录:
    1
    cd flink-1.15.2
  4. 启动本地 Flink 集群:
    1
    2
    ./bin/start-cluster.sh

    输出如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ➜  /opt wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
    ➜ /opt
    ➜ /opt tar -xzf flink-1.15.2-bin-scala_2.12.tgz
    ➜ /opt
    ➜ /opt cd flink-1.15.2
    ➜ flink-1.15.2
    ➜ flink-1.15.2 ./bin/start-cluster.sh
    Starting cluster.
    Starting standalonesession daemon on host airoo1Oi.
    Starting taskexecutor daemon on host airoo1Oi.
  5. 在浏览器中输入 http://localhost:8081/,打开 Flink Web UI: Apache Flink Web UI 概览页 Apache Flink Web UI 概览页 可以看到 Flink 集群信息:一个 TaskManager(Flink 工作进程),一个可用的 Task Slot(任务槽,TaskManager 所提供的资源单元)。

基于Flink实现词频统计功能

以下步骤详细介绍了如何使用 Flink 实时统计从 Socket 接收到的字符串中单词出现次数。

使用Maven命令创建项目

Maven命令

1
2
3
4
mvn archetype:generate                \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.15.2

在执行该命令的过程中会提示输入 groupId、artifactId、version 和 package。

这里填入的是:

1
2
3
4
groupId: com.johnson.flink
artifactId: flink-tutorial
version: 1.0-SNAPSHOT
package: com.johnson.flink

项目目录如下:

1
2
3
4
5
6
7
8
9
10
11
12
➜  flink-tutorial tree
.
├── pom.xml
└── src
└── main
├── java
│   └── com
│   └── johnson
│   └── flink
│   └── DataStreamJob.java
└── resources
└── log4j2.properties

编码

新建 Java 类 WordCountJob 并放在 job 包下,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.johnson.flink.job;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* 词频统计
* 统计从 Socket 接收到的字符串中单词出现次数。
*
* @author johnson lin
* @date 2022/10/9 20:21
*/
public class WordCountJob {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Socket 地址、端口
String hostname = "127.0.0.1";
int port = 3000;

// Source:读取数据
DataStreamSource<String> stream = env.socketTextStream(hostname, port);

// Transform:将每行数据按任何非单词字符分割成单词,再按单词分组统计单词数量
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new WordCountFlatMap())
.keyBy(p -> p.f0).sum(1).uid("Transform");

// Sink:直接打印输出
sum.print();

// 执行程序
env.execute("词频统计");
}

public static class WordCountFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
// 先转为小写,再按任何非单词字符进行分割
String[] words = s.toLowerCase().split("\\W+");

for (String word : words) {
if (word.length() > 0) {
// 回收器回收长度大于0的单词,将该单词的出现次数计为1
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
}

打包程序

命令:

1
mvn clean package -DskipTests

将打包好的 jar 包复制到 Flink 目录下:

1
2
➜  flink-tutorial cp target/flink-tutorial-1.0-SNAPSHOT.jar /opt/flink-1.15.2
➜ flink-tutorial

进入 Flink 安装目录,此时 Flink 目录如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
➜  flink-tutorial cd /opt/flink-1.15.2
➜ flink-1.15.2 ll
total 732K
-rw-r--r-- 1 1709996467 staff 12K Aug 17 20:10 LICENSE
-rw-r--r-- 1 1709996467 staff 600K Aug 18 00:42 NOTICE
-rw-r--r-- 1 1709996467 staff 1.3K Aug 17 20:10 README.txt
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 bin
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 conf
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 examples
-rwxr-xr-x 1 root root 8.1K Oct 10 21:24 flink-tutorial-1.0-SNAPSHOT.jar
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 lib
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 licenses
drwxr-xr-x 1 1709996467 staff 4.0K Oct 9 17:12 log
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 opt
drwxr-xr-x 1 1709996467 staff 4.0K Aug 18 00:42 plugins
➜ flink-tutorial

使用 nc 命令监听 3000 端口

命令:

1
nc -l 3000

运行程序

进行 Flink 目录,指定应用程序的入口类和 jar 文件,在本地集群上运行作业:

1
./bin/flink run -c com.johnson.flink.job.WordCountJob flink-tutorial-1.0-SNAPSHOT.jar

返回结果显示作业已提交,作业的 ID 为 47a66fb7968d8b04dfe574f520bde7af,如下:

1
2
➜  flink-1.15.2 ./bin/flink run -c com.johnson.flink.job.WordCountJob flink-tutorial-1.0-SNAPSHOT.jar
Job has been submitted with JobID 47a66fb7968d8b04dfe574f520bde7af

打开 Flink Web UI,可以看到“Running Jobs List“列表中有一个作业。点击该作业,可以看到该作业的运行情况概览:数据流程图、算子的实时指标等:

展示运行作业的 Apache Flink Web UI 截图

展示运行作业的 Apache Flink Web UI 截图

作业的打印输出(sum.print())会写入 Flink 工作进程的标准输出中,默认情况下它会重定向到 Flink 安装目录的 log 子目录下的文件。可以使用 tail 命令查看输出的 log 文件来观察统计结果:

1
tail -f ./log/flink-<user>-taskexecutor-<n>-<hostname>.out

如查看本示例的输出结果:

1
tail -f log/flink-root-taskexecutor-1-airoo1Oi.out

nc 命令的窗口中,输入以下内容:

1
2
3
a is for apple
b is for ball
c is for cake

可以看到文件中写入了和下面类似的文本行:

1
2
3
4
5
6
7
8
9
10
11
12
13
➜  flink-1.15.2 tail -f log/flink-root-taskexecutor-1-airoo1Oi.out
(a,1)
(is,1)
(for,1)
(apple,1)
(b,1)
(is,2)
(for,2)
(ball,1)
(c,1)
(is,3)
(for,3)
(cake,1)

第一字段是单词,第二个字段是该单词当前出现的频率。

停止程序

在 Flink Web UI 选定作业进入作业的详情页面,然后单击页面右上方的 Cancel Job 按钮取消作业。

停止Flink集群

进入 Flink 安装目录,运行以下命令停止集群:

1
./bin/stop-cluster.sh

小结

本文讲述了如何安装、启动 Flink 本地集群,并运行一个词频统计的 Flink 流式程序,让你对 Flink 有一个初步印象。

(END)