在数据开发、数仓运维、自动化报表场景中,有一个非常高频的需求:从MySQL源表拉取数据 → Python清洗、转换、统计分析 → 处理后的新数据写入MySQL目标表。
如果单纯用本地脚本实现,会面临定时调度失败、日志丢失、任务监控缺失、无法可视化运维等问题。而 DolphinScheduler 作为开源分布式工作流调度工具,完美解决了这些痛点,非常适合承载这类轻量级数据ETL流程。
今天这篇博文,手把手带大家实现「DolphinScheduler + Python + MySQL」完整数据流转流程,全程可直接落地生产。
一、业务场景说明
- 数据落地:将清洗后的标准数据,写入 MySQL 目标表
user_clean
整个流程由 DolphinScheduler 统一调度,支持定时执行、失败重试、日志查看、任务监控。
二、前置环境准备
在开发前,确保环境满足以下条件,避免运行报错:
1. 基础环境
- 已部署 DolphinScheduler 服务(单机/集群均可,版本推荐 3.0+)
- 调度机器已安装 Python3 环境
- MySQL 数据库可正常连接,开放远程访问权限
2. 安装依赖库
Python 操作 MySQL 主流使用 pymysql,数据处理使用 pandas,在 DolphinScheduler 执行机器执行安装命令:
pip install pymysql pandas
3. 数据库表准备
创建源表和目标表(测试用极简表结构)
源表 user_original
CREATE TABLE `user_original` (
`id` INT PRIMARY KEY AUTO_INCREMENT,
`username` VARCHAR(50) DEFAULT NULL,
`age` VARCHAR(20) DEFAULT NULL,
`status` VARCHAR(20) DEFAULT NULL,
`create_time` VARCHAR(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
目标表 user_clean(结构化、标准化字段)
CREATE TABLE `user_clean` (
`id` INT PRIMARY KEY AUTO_INCREMENT,
`username` VARCHAR(50) DEFAULT NULL,
`age` INT DEFAULT 0,
`status` TINYINT DEFAULT 0,
`create_time` DATETIME DEFAULT NULL,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
三、核心Python代码实现
代码核心逻辑:读取源MySQL数据 → 清洗转换 → 批量写入目标MySQL。全程解耦配置,支持直接在DolphinScheduler中运行。
脚本命名:mysql_etl_process.py
import pymysql
import pandas as pd
from datetime import datetime
# ===================== 数据库配置(根据自己环境修改)=====================
# 源库配置
SOURCE_DB = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "你的数据库密码",
"database": "test_db",
"charset": "utf8mb4"
}
# 目标库配置(可与源库为同一库)
TARGET_DB = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "你的数据库密码",
"database": "test_db",
"charset": "utf8mb4"
}
# ====================================================================
def read_mysql_data():
"""从MySQL读取原始数据"""
try:
# 建立数据库连接
conn = pymysql.connect(**SOURCE_DB)
sql = "SELECT username, age, status, create_time FROM user_original"
# 读取数据为DataFrame格式,方便后续处理
df = pd.read_sql(sql, conn)
conn.close()
print(f"【数据读取成功】共读取{len(df)}条原始数据")
return df
except Exception as e:
print(f"【读取数据失败】错误信息:{str(e)}")
return None
def process_data(df):
"""Python数据清洗与转换核心逻辑"""
if df is None or len(df) == 0:
print("【无待处理数据】")
return df
# 1. 去除空值数据
df = df.dropna(subset=["username"])
# 2. 年龄字段转为int类型,异常数据填充0
df["age"] = pd.to_numeric(df["age"], errors="coerce").fillna(0).astype(int)
# 3. 状态字段映射:有效=1,无效=0
status_map = {"正常": 1, "禁用": 0, "1": 1, "0": 0}
df["status"] = df["status"].map(status_map).fillna(0).astype(int)
# 4. 时间字段标准化为DATETIME格式
df["create_time"] = pd.to_datetime(df["create_time"], errors="coerce")
# 5. 新增更新时间
df["update_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"【数据处理完成】清洗后有效数据{len(df)}条")
return df
def write_mysql_data(df):
"""处理后数据批量写入目标MySQL表"""
if df is None or len(df) == 0:
print("【无数据可写入】")
return
try:
conn = pymysql.connect(**TARGET_DB)
cursor = conn.cursor()
# 批量插入SQL语句
insert_sql = """
INSERT INTO user_clean (username, age, status, create_time, update_time)
VALUES (%s, %s, %s, %s, %s)
"""
# 组装批量数据
data_list = df.values.tolist()
# 批量执行插入
cursor.executemany(insert_sql, data_list)
conn.commit()
print(f"【数据写入成功】共写入{cursor.rowcount}条数据")
except Exception as e:
conn.rollback()
print(f"【数据写入失败】错误信息:{str(e)}")
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
# 执行完整ETL流程
source_data = read_mysql_data()
clean_data = process_data(source_data)
write_mysql_data(clean_data)
四、DolphinScheduler 任务配置步骤
代码编写完成后,核心步骤就是在 DolphinScheduler 中创建工作流、配置Python任务,实现自动化调度。
1. 上传脚本至服务器
将 mysql_etl_process.py 上传至 DolphinScheduler 执行机器的固定目录,例如:/opt/dolphinscheduler/scripts/,并赋予执行权限:
chmod +x /opt/dolphinscheduler/scripts/mysql_etl_process.py
2. 创建Python任务节点
- 登录 DolphinScheduler 后台,进入对应项目,新建工作流
- 从左侧组件栏,拖拽 Python 节点 到画布中
- 双击节点,配置任务信息:
- 任务名称:MySQL数据ETL处理
- 运行脚本:选择「脚本路径」,填写完整脚本路径
/opt/dolphinscheduler/scripts/mysql_etl_process.py - 执行环境:选择已安装依赖的Python3环境
- 重试策略:根据需求配置(建议重试1-2次,间隔30秒)
3. 配置调度规则
根据业务需求设置定时规则,例如:
- 每小时执行一次:
0 * * * * - 每日凌晨2点执行:
0 2 * * *
保存工作流,手动触发一次测试运行。
五、任务测试与日志查看
1. 手动运行测试
点击工作流「手动运行」,等待任务执行完成,状态显示「成功」即为正常。
2. 日志排查问题
若任务运行失败,可点击节点「查看日志」,常见日志信息:
- 读取条数、清洗后数据量、写入条数,方便核对数据完整性
- 数据库连接失败、权限不足、字段不匹配等报错信息,快速定位问题
3. 数据结果校验
登录MySQL,查询目标表 user_clean,确认数据已成功写入、字段格式标准化、无脏数据。
六、常见踩坑总结
1. 依赖库报错:ModuleNotFoundError
原因:DolphinScheduler 执行机器的Python环境未安装 pandas/pymysql
解决:在执行任务的worker节点执行pip安装,而非本地机器
2. MySQL连接超时/拒绝连接
原因:数据库未开放远程权限、防火墙拦截、账号密码错误
解决:授权数据库远程访问,关闭服务器防火墙,核对数据库配置参数
3. 数据写入重复
原因:定时任务重复执行,无去重逻辑
优化:可增加每日清空目标表或根据唯一字段去重更新逻辑
4. 脚本权限不足
原因:dolphinscheduler用户无脚本读取、执行权限
解决:修改脚本目录权限,授权dolphinscheduler用户访问
七、进阶优化方向
- 配置解耦:将数据库账号、密码写入配置文件,避免硬编码泄露
- 增量同步:增加时间戳条件,只读取新增/修改数据,全量改增量,提升效率
- 异常告警:对接钉钉/企业微信机器人,任务失败自动推送告警消息
- 事务控制:增加批量事务,保证数据写入原子性,避免部分写入脏数据
- 参数化调度:通过DolphinScheduler全局参数,动态传递同步时间、表名
八、总结
DolphinScheduler + Python + MySQL 的组合,是中小型数据ETL、自动化数据处理的最优轻量方案之一。
相比于原生Shell脚本、本地Python脚本,它具备可视化调度、任务监控、失败重试、日志追溯、定时自动化的能力,完全可以满足日常数据清洗、同步、统计报表等业务需求,上手简单、落地成本极低。
本篇文章的完整代码、表结构、配置步骤均可直接复用,大家可以根据自身业务的表结构、处理逻辑灵活修改!