由 Wipro 通过 AWS 大数据博客

利用 CI/CD、Amazon MWAA 和 Apache Spark 在 Amazon EMR 上构建和运行大规模数据管道

关键要点

  • 传统的 ETL 工具通常复杂且费用高昂,尤其对中小企业客户而言。
  • Wipro 开发了一种程序化数据处理框架,可以帮助数据工程师有效地优化 ETL 流程。
  • 该框架利用 Amazon EMR 和 AWS 管理服务,实现数据管道的自动化和可扩展性。

在当今数据驱动的环境中,各种规模的企业都面临着传统提取、转换和加载(ETL)工具所带来的复杂性和限制。这些工具虽然强大,但往往伴随着可观的财务负担,尤其是对中小企业客户而言。除了高额的采购和许可费用外,客户还需要面临安装、维护和升级相关的支出,这些支出可能会给预算带来负担。Wipro在与客户的沟通中发现,改善数据管道的可扩展性与自动化依然是客户的核心关注点,并且只有通过大量努力才能实现。随着数据量的不断增加,这些工具很难跟上日益增长的需求,导致处理延迟和数据传输的中断,这是在及时洞察至关重要的时代中一个显著的瓶颈。

本文讨论了 Wipro 开发的程序化数据处理框架如何帮助数据工程师克服挑战并简化组织的 ETL 流程。该框架利用了 和与 AWS 管理服务的集成。该框架强大且能够连接多个数据源与目标。通过使用 AWS管理服务的能力,框架消除了传统 ETL工具中通常与基础设施管理相关的重负担,从而使客户可以更战略性地分配资源。此外,我们将展示框架的内置可扩展性如何确保企业能够轻松适应不断增加的数据量,从而在不断变化的数字环境中提升灵活性和响应能力。

解决方案概述

提出的解决方案帮助构建一个完全自动化的数据处理管道,以简化整个工作流。当代码推送到 Git时,自动触发流程,编排和调度作业处理,按照定义的规则验证数据,转换代码中规定的数据,并将转换后的数据集加载到指定的目标。该解决方案的主要组件是利用 开发的强大框架。该框架适用于任何 ETL过程,其中输入可能从各种数据源中获取、转换,并加载到指定的目标。为能够获得宝贵的洞察并提供整体作业监控和自动化,该框架与 AWS 管理服务集成:

服务描述
用于数据管道的编排、调度和执行。
在 Amazon EC2 上执行数据处理作业,提取源数据、执行数据验证和转换,然后加载数据到目标。
用于处理作业监控和进度通知。
用于存储构建工件。
Amazon EC2用于托管和运行 Jenkins 构建服务器。

解决方案步骤

![解决方案架构](https://d2908q01vomqb2.cloudfront.net/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/12/13/BDB-3581-Building- 删除)

解决方案架构如上图所示,包括:

  1. 用于数据处理的持续集成和交付(CI/CD)
  2. 数据工程师可以在 JSON 模板中定义基础数据处理作业。可以在 上查看预定义模板的语法。整体目标包括:
    • 编写将在 Amazon EMR 上执行的 Spark 作业配置。
    • 将数据处理分为三个阶段:
    • 并行提取源数据、验证源数据和准备数据集以进行进一步处理。
    • 提供灵活性以编写 JSON 中定义的业务转换规则,包括例如重复记录、空值检查和移除等的数据验证。还可以包括任何用 Apache Spark SQL 编写的基于 SQL 的转换。
    • 将转换后的数据集加载到目标,并根据需要进行对账。

每个阶段的步骤均被记录以供审计、错误报告、故障排除和安全目的。

  1. 在数据工程师根据第 1 步中规定的模板准备好配置文件并提交到 Git 存储库后,将触发 Jenkins 管道。Jenkins 是一个开源持续集成工具,在 EC2 实例上运行,处理配置文件,构建(编译 Spark 应用程序代码)结束工件——一个 JAR 文件和一个将被复制到 S3 存储桶的配置文件(.conf),后者将在后续由 Amazon EMR 使用。
  2. 数据管道的 CI/CD
  3. 数据处理作业编写完成后,数据工程师可以采用类似的代码驱动开发方法来定义数据处理管道,以调度、编排和执行数据处理作业。Apache Airflow 是用于开发、调度和监控批处理工作流的热门开源平台。在此解决方案中,我们利用 Amazon MWAA 通过有向无环图(DAG)执行数据管道。为简化工程师构建所需的 DAG,可以以简单的 YAML 定义数据管道。可以在 上查看 YAML 文件示例。
  4. 当用户将包含 DAG 细节的 YAML 文件提交到项目 Git 存储库时,将触发另一个 Jenkins 管道。
  5. Jenkins 管道现在读取 YAML 配置文件,并根据任务和依赖关系生成 DAG 脚本文件,该文件被复制到配置的 S3 存储桶中。
  6. Airflow DAG 执行
  7. 在数据处理作业和数据管道都配置完成后,Amazon MWAA 从 S3 存储桶中检索最新的脚本,以在 Airflow 用户界面中显示最新的 DAG 定义。这些 DAG 至少包含三个任务,除了创建和终止 EMR 集群外,每个任务都代表一个 ETL 过程。示例 DAG 代码可在 中找到。
  8. 如作业定义的调度所指定的,Airflow 执行创建 Amazon EMR 的任务,以在 EC2 实例上启动 Amazon EMR 集群。集群创建后,ETL 处理作为步骤提交给 Amazon EMR。
  9. Amazon EMR 并行执行这些步骤(Amazon EMR 提供定义同一时间处理多少步骤的 )。任务完成后,为节省成本而终止 Amazon EMR 集群。
  10. ETL 处理
  11. 每个由 Airflow 提交给 Amazon EMR 的步骤,也通过 Spark 提交命令包含配置文件的 S3 存储桶路径,作为参数传递。
  12. 根据配置文件获取输入数据并应用技术验证。如果在数据处理作业中启用了数据映射,则根据给定的架构准备结构化数据。此输出将传递给下一阶段,以应用数据转换和业务验证。
  13. 对转换后的数据应用一系列对账规则,以确保数据质量。完成此步骤后,数据加载到指定目标中。

![ETL数据处理作业](https://d2908q01vomqb2.cloudfront.net/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/12/13/BDB-3581-Building- 删除)

  1. 日志记录、监控和通知
  2. Amazon MWAA 提供从每个任务生成的日志,可在 Airflow UI 中查看。使用这些日志,您可以监控 Apache Airflow 任务的详细信息、延迟和工作流错误。
  3. Amazon MWAA 还定期向 Amazon EMR 集群发送请求,以获取正在执行步骤的最新状态,并相应更新任务状态。
  4. 如果某个步骤失败,例如由于高流量而未能建立数据库连接,Amazon MWAA 将重复该过程。
  5. 每当某个任务失败时,Amazon SNS 将向预配置的收件人发送电子邮件通知,附上失败原因和日志。

此解决方案的关键能力包括:

关键能力描述
完全自动化用户将配置文件提交到 Git 后,其余过程完全自动化,CI/CD 管道会部署工件和 DAG 代码。DAG 代码在预定时间在 Airflow 中执行。所有 ETL 作业都被记录和监控,并在出现任何失败时发送电子邮件通知。
灵活集成应用程序可以轻松地以最小的努力适应新的 ETL 过程。要创建新流程,只需准备包含源和目标详细信息及必要转换逻辑的配置文件即可。以下示例展示了如何指定数据转换:
容错性除了 Apache Spark 的容错能力外,该解决方案还提供即使在 Amazon EMR 被终止后也能恢复数据的能力。该应用程序解决方案分为三个阶段。如果 Apache Spark 作业出现故障,则最后一个成功阶段的输出将暂时存储在 Amazon S3。通过 Airflow DAG 重新触发作业时,Apache Spark 作业将从上次失败的地方恢复,从而确保业务连续性,最大限度减少工作流的中断。
可扩展性如下图所示,Amazon EMR 集群配置为使用实例池选项,根据数据的大小动态调整节点数量,确保该应用程序适合数据需求日益增长的企业。
可自定义该解决方案可根据特定用例的需求进行自定义,允许您根据独特的数据管理要求添加自己的转换、验证和对账。
增强的数据灵活性通过支持多种文件格式,Apache Spark 应用程序和 Airflow DAG 能够无缝集成并处理来自各个源的数据。这种优势使数据工程师能够处理多种文件格式,包括 JSON、XML、文本、CSV、Parquet、Avro 等。
并发执行Amazon MWAA 将任务提交给 Amazon EMR 进行并发执行,利用分布式计算的可扩展性和性能高效处理大量数据。
主动错误通知每当任务失败时,系统会及时向预配置的收件人发送电子邮件通知,从而提供对故障的及时了解,促进快速故障排除。

注意事项

在我们的部署中,我们发现一个 DAG 完成的平均时间为 15-20 分钟,包含 18 个 ETL 过程,并处理每个 90 万到 120万条记录。如果您想进一步优化 DAG 完成时间,可以根据 的描述配置 Amazon MWAA 控制台中的 core.dag_concurrency

结论

Wipro 开发的基于代码的数据处理框架利用 和 Amazon MWAA 展现了一种应对传统 ETL工具挑战的解决方案。通过利用开源框架的能力并无缝集成 AWS 服务,企业能够构建高效、可扩展和自动化的数据处理管道。

现在您已经了解了如何使用 Amazon EMR Runtime for Apache Spark 结合 Amazon MWAA,并鼓励您利用 AmazonMWAA 创建能够在 Amazon EMR Runtime for Apache Spark 上运行的 ETL 作业工作流。

在本文中提到的配置文件示例和示例 DAG 代码可在 中找到。

参考文献

免责声明

示例代码、软件库、命令行工具、概念证明、模板或其他相关技术作为 AWS 内容或第三方内容提供,需符合 AWS 客户协议或您与 AWS之间的相关书面协议。您不应在生产账户或生产或其他关键数据上使用此 AWS 内容或第三方内容。性能指标(包括所述的 DAG完成时间)可能会因特定的部署环境而有所不同。您有责任对 AWS内容或第三方内容进行测试、安全性和优化,以确保适合基于特定质量控制实践和标准的生产级使用。部署 AWS 内容或第三方内容可能会因创建或使用 AWS收费资源(例如运行 Amazon EC2 实例或使用 Amazon S3 存储)而产生 AWS 费用。


作者介绍

![Deependra

删除)DeependraShekhawat 是一位经验丰富的能源与公用事业行业解决方案架构师,驻悉尼,澳大利亚。在他的角色中,Deependra帮助亚太地区的能源公司使用云技术推动可持续性和运营效率,专注于创建强大的数据基础和高级工作流,以使组织能够利用大数据、分析和机器学习的力量解决关键的行业挑战。

![Senaka

删除)SenakaAriyasinghe 是 AWS 全球系统集成商的高级合作伙伴解决方案架构师。在他的角色中,Senaka 指导亚太地区的 AWS合作伙伴设计和扩展架构良好的解决方案,专注于生成性 AI、机器学习、云迁移和应用现代化项目。

![Sandeep

删除)SandeepKushwaha 是 Wipro 的高级数据科学家,在大数据和机器学习方面拥有丰富的经验。Sandeep 精通 ApacheSpark,并设计和实现前沿的云解决方案,优化数据处理并推动效率。他在使用 AWS服务和最佳实践中的专业知识,加上对数据管理和自动化的深入了解,使他能够领导成功项目,解决复杂的技术挑战并交付高影响结果。

加载评论……

Leave a Reply

Required fields are marked *