DolphinScheduler 实战:Python 读取MySQL数据,处理后再写入MySQL

17次阅读
没有评论

在数据开发、数仓运维、自动化报表场景中,有一个非常高频的需求:从MySQL源表拉取数据 → Python清洗、转换、统计分析 → 处理后的新数据写入MySQL目标表

如果单纯用本地脚本实现,会面临定时调度失败、日志丢失、任务监控缺失、无法可视化运维等问题。而 DolphinScheduler 作为开源分布式工作流调度工具,完美解决了这些痛点,非常适合承载这类轻量级数据ETL流程。

今天这篇博文,手把手带大家实现「DolphinScheduler + Python + MySQL」完整数据流转流程,全程可直接落地生产。

一、业务场景说明

  1. 数据落地:将清洗后的标准数据,写入 MySQL 目标表 user_clean

整个流程由 DolphinScheduler 统一调度,支持定时执行、失败重试、日志查看、任务监控。

二、前置环境准备

在开发前,确保环境满足以下条件,避免运行报错:

1. 基础环境

  • 已部署 DolphinScheduler 服务(单机/集群均可,版本推荐 3.0+)
  • 调度机器已安装 Python3 环境
  • MySQL 数据库可正常连接,开放远程访问权限

2. 安装依赖库

Python 操作 MySQL 主流使用 pymysql,数据处理使用 pandas,在 DolphinScheduler 执行机器执行安装命令:

pip install pymysql pandas

3. 数据库表准备

创建源表和目标表(测试用极简表结构)

源表 user_original

CREATE TABLE `user_original` (
  `id` INT PRIMARY KEY AUTO_INCREMENT,
  `username` VARCHAR(50) DEFAULT NULL,
  `age` VARCHAR(20) DEFAULT NULL,
  `status` VARCHAR(20) DEFAULT NULL,
  `create_time` VARCHAR(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

目标表 user_clean(结构化、标准化字段)

CREATE TABLE `user_clean` (
  `id` INT PRIMARY KEY AUTO_INCREMENT,
  `username` VARCHAR(50) DEFAULT NULL,
  `age` INT DEFAULT 0,
  `status` TINYINT DEFAULT 0,
  `create_time` DATETIME DEFAULT NULL,
  `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

三、核心Python代码实现

代码核心逻辑:读取源MySQL数据 → 清洗转换 → 批量写入目标MySQL。全程解耦配置,支持直接在DolphinScheduler中运行。

脚本命名:mysql_etl_process.py

import pymysql
import pandas as pd
from datetime import datetime

# ===================== 数据库配置(根据自己环境修改)=====================
# 源库配置
SOURCE_DB = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "password": "你的数据库密码",
    "database": "test_db",
    "charset": "utf8mb4"
}

# 目标库配置(可与源库为同一库)
TARGET_DB = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "password": "你的数据库密码",
    "database": "test_db",
    "charset": "utf8mb4"
}
# ====================================================================

def read_mysql_data():
    """从MySQL读取原始数据"""
    try:
        # 建立数据库连接
        conn = pymysql.connect(**SOURCE_DB)
        sql = "SELECT username, age, status, create_time FROM user_original"
        # 读取数据为DataFrame格式,方便后续处理
        df = pd.read_sql(sql, conn)
        conn.close()
        print(f"【数据读取成功】共读取{len(df)}条原始数据")
        return df
    except Exception as e:
        print(f"【读取数据失败】错误信息:{str(e)}")
        return None

def process_data(df):
    """Python数据清洗与转换核心逻辑"""
    if df is None or len(df) == 0:
        print("【无待处理数据】")
        return df

    # 1. 去除空值数据
    df = df.dropna(subset=["username"])
    # 2. 年龄字段转为int类型,异常数据填充0
    df["age"] = pd.to_numeric(df["age"], errors="coerce").fillna(0).astype(int)
    # 3. 状态字段映射:有效=1,无效=0
    status_map = {"正常": 1, "禁用": 0, "1": 1, "0": 0}
    df["status"] = df["status"].map(status_map).fillna(0).astype(int)
    # 4. 时间字段标准化为DATETIME格式
    df["create_time"] = pd.to_datetime(df["create_time"], errors="coerce")
    # 5. 新增更新时间
    df["update_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    print(f"【数据处理完成】清洗后有效数据{len(df)}条")
    return df

def write_mysql_data(df):
    """处理后数据批量写入目标MySQL表"""
    if df is None or len(df) == 0:
        print("【无数据可写入】")
        return

    try:
        conn = pymysql.connect(**TARGET_DB)
        cursor = conn.cursor()
        # 批量插入SQL语句
        insert_sql = """
        INSERT INTO user_clean (username, age, status, create_time, update_time)
        VALUES (%s, %s, %s, %s, %s)
        """
        # 组装批量数据
        data_list = df.values.tolist()
        # 批量执行插入
        cursor.executemany(insert_sql, data_list)
        conn.commit()
        print(f"【数据写入成功】共写入{cursor.rowcount}条数据")
    except Exception as e:
        conn.rollback()
        print(f"【数据写入失败】错误信息:{str(e)}")
    finally:
        cursor.close()
        conn.close()

if __name__ == "__main__":
    # 执行完整ETL流程
    source_data = read_mysql_data()
    clean_data = process_data(source_data)
    write_mysql_data(clean_data)

四、DolphinScheduler 任务配置步骤

代码编写完成后,核心步骤就是在 DolphinScheduler 中创建工作流、配置Python任务,实现自动化调度。

1. 上传脚本至服务器

mysql_etl_process.py 上传至 DolphinScheduler 执行机器的固定目录,例如:/opt/dolphinscheduler/scripts/,并赋予执行权限:

chmod +x /opt/dolphinscheduler/scripts/mysql_etl_process.py

2. 创建Python任务节点

  1. 登录 DolphinScheduler 后台,进入对应项目,新建工作流
  2. 从左侧组件栏,拖拽 Python 节点 到画布中
  3. 双击节点,配置任务信息:
    1. 任务名称:MySQL数据ETL处理
    2. 运行脚本:选择「脚本路径」,填写完整脚本路径/opt/dolphinscheduler/scripts/mysql_etl_process.py
    3. 执行环境:选择已安装依赖的Python3环境
    4. 重试策略:根据需求配置(建议重试1-2次,间隔30秒)

3. 配置调度规则

根据业务需求设置定时规则,例如:

  • 每小时执行一次:0 * * * *
  • 每日凌晨2点执行:0 2 * * *

保存工作流,手动触发一次测试运行。

五、任务测试与日志查看

1. 手动运行测试

点击工作流「手动运行」,等待任务执行完成,状态显示「成功」即为正常。

2. 日志排查问题

若任务运行失败,可点击节点「查看日志」,常见日志信息:

  • 读取条数、清洗后数据量、写入条数,方便核对数据完整性
  • 数据库连接失败、权限不足、字段不匹配等报错信息,快速定位问题

3. 数据结果校验

登录MySQL,查询目标表 user_clean,确认数据已成功写入、字段格式标准化、无脏数据。

六、常见踩坑总结

1. 依赖库报错:ModuleNotFoundError

原因:DolphinScheduler 执行机器的Python环境未安装 pandas/pymysql

解决:在执行任务的worker节点执行pip安装,而非本地机器

2. MySQL连接超时/拒绝连接

原因:数据库未开放远程权限、防火墙拦截、账号密码错误

解决:授权数据库远程访问,关闭服务器防火墙,核对数据库配置参数

3. 数据写入重复

原因:定时任务重复执行,无去重逻辑

优化:可增加每日清空目标表根据唯一字段去重更新逻辑

4. 脚本权限不足

原因:dolphinscheduler用户无脚本读取、执行权限

解决:修改脚本目录权限,授权dolphinscheduler用户访问

七、进阶优化方向

  1. 配置解耦:将数据库账号、密码写入配置文件,避免硬编码泄露
  2. 增量同步:增加时间戳条件,只读取新增/修改数据,全量改增量,提升效率
  3. 异常告警:对接钉钉/企业微信机器人,任务失败自动推送告警消息
  4. 事务控制:增加批量事务,保证数据写入原子性,避免部分写入脏数据
  5. 参数化调度:通过DolphinScheduler全局参数,动态传递同步时间、表名

八、总结

DolphinScheduler + Python + MySQL 的组合,是中小型数据ETL、自动化数据处理的最优轻量方案之一。

相比于原生Shell脚本、本地Python脚本,它具备可视化调度、任务监控、失败重试、日志追溯、定时自动化的能力,完全可以满足日常数据清洗、同步、统计报表等业务需求,上手简单、落地成本极低。

本篇文章的完整代码、表结构、配置步骤均可直接复用,大家可以根据自身业务的表结构、处理逻辑灵活修改!

正文完
可以使用微信扫码关注公众号(ID:xzluomor)
post-qrcode
 0
评论(没有评论)
验证码