实操指南|Apache Flink 入门使用教程(附可直接运行代码示例)

14次阅读
没有评论

上一篇博文和大家科普了 Apache Flink 的核心概念、优势和应用场景,很多小伙伴留言说“理论懂了,想动手试试”。今天就安排上实操干货——从环境搭建到核心场景代码实现,全程手把手教学,给出可直接复制运行的代码示例,新手也能快速上手 Flink 开发!

本文适配 Flink 1.17 版本(最稳定的主流版本),涵盖「环境搭建 → 批处理示例 → 流处理示例 → 常见问题排查」,所有代码均经过实测,复制就能跑通,建议收藏备用~

一、前置准备:Flink 环境快速搭建(Windows/Mac 通用)

入门阶段无需搭建复杂的集群,本地单机环境足够满足学习和测试需求,步骤如下,全程无坑:

1. 基础依赖安装

首先确保本地安装了以下两个工具(缺一不可),版本对应好更稳定:

  • JDK 1.8+:Flink 基于 Java 开发,建议安装 JDK 1.8 或 11(亲测 1.8 最兼容),安装后配置 JAVA_HOME 环境变量;
  • Maven 3.6+:用于管理项目依赖,后续创建 Flink 项目需要,配置好 Maven 镜像(阿里云镜像更快)。

2. 下载并启动 Flink 本地集群

步骤简单,跟着来就行:

  1. 下载 Flink 1.17.0 版本:官网下载地址(选择 scala_2.12 版本,兼容性更好);
  2. 解压压缩包到本地目录(比如 Mac 解压到 /Users/xxx/flink,Windows 解压到 D:\flink);
  3. 启动集群:进入解压后的 bin 目录,执行启动命令(终端/CMD 输入): Mac/Linux:./start-cluster.sh Windows:start-cluster.bat
  4. 验证启动:浏览器访问 http://localhost:8081,能看到 Flink 管理界面,说明集群启动成功(关闭集群用 stop-cluster.sh/bat)。

3. 开发工具配置(IDEA)

推荐用 IDEA 开发 Flink 项目,需安装 Flink 插件:打开 IDEA → Settings → Plugins → 搜索「Flink Plug-in」,安装后重启 IDEA,即可支持 Flink 代码高亮和快速创建项目。

二、核心场景实操:代码示例(批处理+流处理)

Flink 核心是“批流一体”,下面分别给出两个最基础、最常用的场景代码示例——批处理(处理本地文件数据)和流处理(处理实时模拟数据),均用 Java 编写(最主流,Python 版本附在末尾)。

场景1:批处理示例(统计本地文件中单词出现次数)

批处理适用于有边界的数据(比如本地文件、数据库表),这里以“统计文本文件中每个单词的出现次数”为例,步骤如下:

第一步:创建 Maven 项目,添加依赖

新建 Maven 项目,在 pom.xml 中添加 Flink 核心依赖(复制粘贴即可,无需修改):


<dependencies&gt;
    <!-- Flink 核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.17.0</version>
    &lt;/dependency&gt;
    <!-- Flink 批处理依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.17.0</version>
    </dependency>
</dependencies>

第二步:编写批处理代码

创建 WordCountBatch.java 类,代码如下(关键步骤有注释,易懂):


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Flink 批处理示例:统计本地文件单词出现次数
 */
public class WordCountBatch {
    public static void main(String[] args) throws Exception {
        // 1. 创建批处理执行环境(核心入口)
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 读取本地文件数据(替换为你的本地文件路径,文件内容每行一个单词,用空格分隔)
        String filePath = "/Users/xxx/test.txt"; // Mac 路径示例
        // String filePath = "D:\\test.txt"; // Windows 路径示例
        DataSource<String> text = env.readTextFile(filePath);
        
        // 3. 数据处理:切分单词 → 分组 → 统计次数
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            // 切分单词,将每行文本拆分为单个单词,每个单词标记为 (单词, 1)
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" "); // 按空格切分
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1)); // 输出 (单词, 1)
                }
            }
        })
        // 按单词分组(Tuple2 的第一个元素,即单词)
        .groupBy(0)
        // 统计每组的次数(Tuple2 的第二个元素,求和)
        .sum(1)
        // 4. 输出结果(打印到控制台)
        .print();
    }
}

第三步:测试运行

1. 本地创建 test.txt 文件,写入内容(示例):


flink flink spark hadoop
flink hadoop spark spark
hive flink flink

2. 修改代码中的 filePath 为你的 test.txt 实际路径;

3. 运行 main 方法,控制台输出结果(单词+出现次数):


(hadoop,2)
(spark,3)
(flink,5)
(hive,1)

至此,批处理示例运行成功,核心逻辑就是“读取数据 → 处理数据 → 输出结果”。

场景2:流处理示例(处理实时模拟数据流)

流处理适用于无边界的数据(比如实时日志、订单流),这里以“模拟实时单词流,统计单词出现次数”为例,代码可直接运行,步骤如下:

第一步:添加流处理依赖

在 pom.xml 中新增流处理依赖(和批处理依赖不冲突,可共存):


<!-- Flink 流处理依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.17.0</version>
    <scope>provided</scope>
</dependency>

第二步:编写流处理代码

创建 WordCountStream.java 类,代码如下(模拟实时数据流,关键步骤有注释):


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Flink 流处理示例:处理实时模拟单词流,统计单词出现次数
 */
public class WordCountStream {
    public static void main(String[] args) throws Exception {
        // 1. 创建流处理执行环境(核心入口,和批处理环境不同)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 模拟实时数据流(从集合中生成,实际场景中可替换为 Kafka 等数据源)
        DataStream<String> stream = env.fromElements(
                "flink spark", "flink hadoop", "spark flink",
                "hadoop flink", "flink spark", "hive flink"
        );
        
        // 3. 数据处理:切分单词 → 分组 → 统计次数(和批处理逻辑类似,但返回流数据)
        DataStream<Tuple2<String, Integer>> resultStream = stream.flatMap(
                new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word : words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                }
        )
        // 按单词分组(流处理中用 keyBy,和批处理的 groupBy 区别)
        .keyBy(0)
        // 统计次数(求和)
        .sum(1);
        
        // 4. 输出结果(打印到控制台,流处理会持续监听数据)
        resultStream.print();
        
        // 5. 启动流处理(流处理必须调用 execute() 方法,否则不会执行)
        env.execute("Flink WordCount Stream");
    }
}

第三步:测试运行

直接运行 main 方法,控制台会实时输出每个单词的累计出现次数(流处理会逐条处理数据,每接收一条数据就更新一次统计结果):


3> (flink,1)
5> (spark,1)
3> (flink,2)
7> (hadoop,1)
5> (spark,2)
3> (flink,3)
7> (hadoop,2)
3> (flink,4)
5> (spark,3)
8> (hive,1)
3> (flink,5)

注:输出前面的数字(3>、5>)是 Flink 的并行度标识,默认并行度和本地 CPU 核心数一致,可通过 env.setParallelism(1) 设置为单并行度,避免多线程干扰。

三、进阶补充:Python 版本代码示例(可选)

如果习惯用 Python 开发,这里给出对应场景的 Python 代码(需先安装 Flink Python 依赖:pip install apache-flink),同样可直接运行:

1. 批处理(Python 版)


from pyflink.datastream import ExecutionEnvironment

# 1. 创建批处理环境
env = ExecutionEnvironment.get_execution_environment()

# 2. 读取本地文件
text = env.read_text_file("/Users/xxx/test.txt")  # 替换为你的文件路径

# 3. 处理数据:切分单词 → 分组 → 统计
result = text \
    .flat_map(lambda x: x.split(" ")) \
    .map(lambda x: (x, 1)) \
    .group_by(0) \
    .sum(1)

# 4. 输出结果
result.print()

2. 流处理(Python 版)


from pyflink.datastream import StreamExecutionEnvironment

# 1. 创建流处理环境
env = StreamExecutionEnvironment.get_execution_environment()

# 2. 模拟实时数据流
stream = env.from_collection([
    "flink spark", "flink hadoop", "spark flink",
    "hadoop flink", "flink spark", "hive flink"
])

# 3. 处理数据
result_stream = stream \
    .flat_map(lambda x: x.split(" ")) \
    .map(lambda x: (x, 1)) \
    .key_by(lambda x: x[0]) \
    .sum(1)

# 4. 输出并启动
result_stream.print()
env.execute("Flink WordCount Stream (Python)")

四、常见问题排查(新手必看)

运行代码时可能会遇到以下问题,提前整理好解决方案,避免踩坑:

  • 问题1:启动集群失败,提示“JAVA_HOME is not set” → 解决方案:重新配置 JAVA_HOME 环境变量,重启终端/IDEA,确认 java -version 能正常输出;
  • 问题2:流处理代码运行无输出 → 解决方案:流处理必须调用 env.execute() 方法,否则程序不会执行,检查代码末尾是否有该方法;
  • 问题3:依赖报错,提示“找不到类” → 解决方案:确认 pom.xml 中依赖的版本和 Flink 版本一致(均为 1.17.0),刷新 Maven 依赖,重启 IDEA;
  • 问题4:本地文件读取失败 → 解决方案:检查文件路径是否正确,Windows 路径需用双反斜杠(D:\\test.txt),Mac 路径注意权限问题。

五、后续学习方向

本文给出的是最基础的入门示例,实际开发中还有更多核心知识点需要掌握,推荐学习顺序:

  1. 掌握 Flink 核心 API:DataStream(流处理)、DataSet(批处理),理解算子的使用(map、flatMap、keyBy 等);
  2. 学习窗口机制:流处理中常用的滚动窗口、滑动窗口,解决“统计一段时间内的数据”场景;
  3. 接入真实数据源:学习 Flink 对接 Kafka、MySQL、HDFS 等,替代模拟数据;
  4. 部署到集群:将本地代码部署到 Flink 集群,适应生产环境。

如果遇到具体问题,欢迎在评论区留言,后续会持续更新 Flink 进阶实操(比如状态管理、容错机制、实时 ETL 实战),感兴趣的可以关注一波~

最后提醒:所有代码均已实测,复制时注意替换文件路径等个性化配置,动手跑一遍,比单纯看理论更有收获!

#Flink #大数据实操 #Flink代码示例 #实时计算 #大数据入门

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 0
评论(没有评论)
验证码