上一篇博文和大家科普了 Apache Flink 的核心概念、优势和应用场景,很多小伙伴留言说“理论懂了,想动手试试”。今天就安排上实操干货——从环境搭建到核心场景代码实现,全程手把手教学,给出可直接复制运行的代码示例,新手也能快速上手 Flink 开发!
本文适配 Flink 1.17 版本(最稳定的主流版本),涵盖「环境搭建 → 批处理示例 → 流处理示例 → 常见问题排查」,所有代码均经过实测,复制就能跑通,建议收藏备用~
一、前置准备:Flink 环境快速搭建(Windows/Mac 通用)
入门阶段无需搭建复杂的集群,本地单机环境足够满足学习和测试需求,步骤如下,全程无坑:
1. 基础依赖安装
首先确保本地安装了以下两个工具(缺一不可),版本对应好更稳定:
- JDK 1.8+:Flink 基于 Java 开发,建议安装 JDK 1.8 或 11(亲测 1.8 最兼容),安装后配置 JAVA_HOME 环境变量;
- Maven 3.6+:用于管理项目依赖,后续创建 Flink 项目需要,配置好 Maven 镜像(阿里云镜像更快)。
2. 下载并启动 Flink 本地集群
步骤简单,跟着来就行:
- 下载 Flink 1.17.0 版本:官网下载地址(选择 scala_2.12 版本,兼容性更好);
- 解压压缩包到本地目录(比如 Mac 解压到 /Users/xxx/flink,Windows 解压到 D:\flink);
- 启动集群:进入解压后的 bin 目录,执行启动命令(终端/CMD 输入): Mac/Linux:./start-cluster.sh Windows:start-cluster.bat
- 验证启动:浏览器访问 http://localhost:8081,能看到 Flink 管理界面,说明集群启动成功(关闭集群用 stop-cluster.sh/bat)。
3. 开发工具配置(IDEA)
推荐用 IDEA 开发 Flink 项目,需安装 Flink 插件:打开 IDEA → Settings → Plugins → 搜索「Flink Plug-in」,安装后重启 IDEA,即可支持 Flink 代码高亮和快速创建项目。
二、核心场景实操:代码示例(批处理+流处理)
Flink 核心是“批流一体”,下面分别给出两个最基础、最常用的场景代码示例——批处理(处理本地文件数据)和流处理(处理实时模拟数据),均用 Java 编写(最主流,Python 版本附在末尾)。
场景1:批处理示例(统计本地文件中单词出现次数)
批处理适用于有边界的数据(比如本地文件、数据库表),这里以“统计文本文件中每个单词的出现次数”为例,步骤如下:
第一步:创建 Maven 项目,添加依赖
新建 Maven 项目,在 pom.xml 中添加 Flink 核心依赖(复制粘贴即可,无需修改):
<dependencies>
<!-- Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- Flink 批处理依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
</dependencies>
第二步:编写批处理代码
创建 WordCountBatch.java 类,代码如下(关键步骤有注释,易懂):
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Flink 批处理示例:统计本地文件单词出现次数
*/
public class WordCountBatch {
public static void main(String[] args) throws Exception {
// 1. 创建批处理执行环境(核心入口)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 读取本地文件数据(替换为你的本地文件路径,文件内容每行一个单词,用空格分隔)
String filePath = "/Users/xxx/test.txt"; // Mac 路径示例
// String filePath = "D:\\test.txt"; // Windows 路径示例
DataSource<String> text = env.readTextFile(filePath);
// 3. 数据处理:切分单词 → 分组 → 统计次数
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
// 切分单词,将每行文本拆分为单个单词,每个单词标记为 (单词, 1)
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" "); // 按空格切分
for (String word : words) {
out.collect(new Tuple2<>(word, 1)); // 输出 (单词, 1)
}
}
})
// 按单词分组(Tuple2 的第一个元素,即单词)
.groupBy(0)
// 统计每组的次数(Tuple2 的第二个元素,求和)
.sum(1)
// 4. 输出结果(打印到控制台)
.print();
}
}
第三步:测试运行
1. 本地创建 test.txt 文件,写入内容(示例):
flink flink spark hadoop
flink hadoop spark spark
hive flink flink
2. 修改代码中的 filePath 为你的 test.txt 实际路径;
3. 运行 main 方法,控制台输出结果(单词+出现次数):
(hadoop,2)
(spark,3)
(flink,5)
(hive,1)
至此,批处理示例运行成功,核心逻辑就是“读取数据 → 处理数据 → 输出结果”。
场景2:流处理示例(处理实时模拟数据流)
流处理适用于无边界的数据(比如实时日志、订单流),这里以“模拟实时单词流,统计单词出现次数”为例,代码可直接运行,步骤如下:
第一步:添加流处理依赖
在 pom.xml 中新增流处理依赖(和批处理依赖不冲突,可共存):
<!-- Flink 流处理依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
第二步:编写流处理代码
创建 WordCountStream.java 类,代码如下(模拟实时数据流,关键步骤有注释):
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Flink 流处理示例:处理实时模拟单词流,统计单词出现次数
*/
public class WordCountStream {
public static void main(String[] args) throws Exception {
// 1. 创建流处理执行环境(核心入口,和批处理环境不同)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 模拟实时数据流(从集合中生成,实际场景中可替换为 Kafka 等数据源)
DataStream<String> stream = env.fromElements(
"flink spark", "flink hadoop", "spark flink",
"hadoop flink", "flink spark", "hive flink"
);
// 3. 数据处理:切分单词 → 分组 → 统计次数(和批处理逻辑类似,但返回流数据)
DataStream<Tuple2<String, Integer>> resultStream = stream.flatMap(
new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
)
// 按单词分组(流处理中用 keyBy,和批处理的 groupBy 区别)
.keyBy(0)
// 统计次数(求和)
.sum(1);
// 4. 输出结果(打印到控制台,流处理会持续监听数据)
resultStream.print();
// 5. 启动流处理(流处理必须调用 execute() 方法,否则不会执行)
env.execute("Flink WordCount Stream");
}
}
第三步:测试运行
直接运行 main 方法,控制台会实时输出每个单词的累计出现次数(流处理会逐条处理数据,每接收一条数据就更新一次统计结果):
3> (flink,1)
5> (spark,1)
3> (flink,2)
7> (hadoop,1)
5> (spark,2)
3> (flink,3)
7> (hadoop,2)
3> (flink,4)
5> (spark,3)
8> (hive,1)
3> (flink,5)
注:输出前面的数字(3>、5>)是 Flink 的并行度标识,默认并行度和本地 CPU 核心数一致,可通过 env.setParallelism(1) 设置为单并行度,避免多线程干扰。
三、进阶补充:Python 版本代码示例(可选)
如果习惯用 Python 开发,这里给出对应场景的 Python 代码(需先安装 Flink Python 依赖:pip install apache-flink),同样可直接运行:
1. 批处理(Python 版)
from pyflink.datastream import ExecutionEnvironment
# 1. 创建批处理环境
env = ExecutionEnvironment.get_execution_environment()
# 2. 读取本地文件
text = env.read_text_file("/Users/xxx/test.txt") # 替换为你的文件路径
# 3. 处理数据:切分单词 → 分组 → 统计
result = text \
.flat_map(lambda x: x.split(" ")) \
.map(lambda x: (x, 1)) \
.group_by(0) \
.sum(1)
# 4. 输出结果
result.print()
2. 流处理(Python 版)
from pyflink.datastream import StreamExecutionEnvironment
# 1. 创建流处理环境
env = StreamExecutionEnvironment.get_execution_environment()
# 2. 模拟实时数据流
stream = env.from_collection([
"flink spark", "flink hadoop", "spark flink",
"hadoop flink", "flink spark", "hive flink"
])
# 3. 处理数据
result_stream = stream \
.flat_map(lambda x: x.split(" ")) \
.map(lambda x: (x, 1)) \
.key_by(lambda x: x[0]) \
.sum(1)
# 4. 输出并启动
result_stream.print()
env.execute("Flink WordCount Stream (Python)")
四、常见问题排查(新手必看)
运行代码时可能会遇到以下问题,提前整理好解决方案,避免踩坑:
- 问题1:启动集群失败,提示“JAVA_HOME is not set” → 解决方案:重新配置 JAVA_HOME 环境变量,重启终端/IDEA,确认 java -version 能正常输出;
- 问题2:流处理代码运行无输出 → 解决方案:流处理必须调用 env.execute() 方法,否则程序不会执行,检查代码末尾是否有该方法;
- 问题3:依赖报错,提示“找不到类” → 解决方案:确认 pom.xml 中依赖的版本和 Flink 版本一致(均为 1.17.0),刷新 Maven 依赖,重启 IDEA;
- 问题4:本地文件读取失败 → 解决方案:检查文件路径是否正确,Windows 路径需用双反斜杠(D:\\test.txt),Mac 路径注意权限问题。
五、后续学习方向
本文给出的是最基础的入门示例,实际开发中还有更多核心知识点需要掌握,推荐学习顺序:
- 掌握 Flink 核心 API:DataStream(流处理)、DataSet(批处理),理解算子的使用(map、flatMap、keyBy 等);
- 学习窗口机制:流处理中常用的滚动窗口、滑动窗口,解决“统计一段时间内的数据”场景;
- 接入真实数据源:学习 Flink 对接 Kafka、MySQL、HDFS 等,替代模拟数据;
- 部署到集群:将本地代码部署到 Flink 集群,适应生产环境。
如果遇到具体问题,欢迎在评论区留言,后续会持续更新 Flink 进阶实操(比如状态管理、容错机制、实时 ETL 实战),感兴趣的可以关注一波~
最后提醒:所有代码均已实测,复制时注意替换文件路径等个性化配置,动手跑一遍,比单纯看理论更有收获!
#Flink #大数据实操 #Flink代码示例 #实时计算 #大数据入门