随着组织数字化进程的加快,借助网页应用更好地服务客户的机会也在不断增加。 日志在这一过程中发挥着重要作用,帮助组织主动监控安全性、强化合规性以及增强应用防护。AWSWAF日志分析在许多行业中都至关重要,如银行、零售和医疗保健等,都需要提供安全的数字体验。
为了优化安全运营,组织正在采用现代化的方式,将实时监控与可扩展的数据分析相结合。它们利用数据湖架构和 高效处理大量安全数据,同时降低运营开销。使用 (Amazon S3) 存储安全数据时,ApacheIceberg可以结合企业可靠性与SQL简单性,让组织专注于安全洞察,而不是基础设施管理。
然而,组织在构建自己的数据流向ApacheIceberg表的解决方案时,通常会面临诸多挑战。这些挑战包括管理复杂的提取、转换和加载(ETL)过程、处理模式验证、提供可靠的交付以及维护数据转化的自定义代码。同时,各团队还需构建强大的错误处理机制、实现重试逻辑,并管理可伸缩基础设施——所有这些都需要保持数据的一致性和高可用性。这些挑战会占用宝贵的时间,影响安全数据分析和洞察的效率。
为了解决这些问题, 提供几秒内将数据实时传输到ApacheIceberg表的功能。Firehose在多个可用区中提供高可靠性,同时自动扩展以满足吞吐量需求。使用它,无需任何基础设施管理或自定义代码开发。Firehose提供可配置的缓冲选项来优化接近零延迟的数据流,同时还具备内置的数据转化、压缩和加密功能,以及自动重试机制,以确保数据的可靠传输。这使其成为将AWSWAF日志直接流入数据湖的理想选择,同时降低运营开销。
在本文中,我们将演示如何利用Firehose和Apache Iceberg构建可扩展的AWSWAF日志分析解决方案。Firehose简化了从日志摄取到存储的整个流程,让您能够配置一个交付流,将AWS WAF日志直接传输到AmazonS3中的Apache Iceberg表。该解决方案无需基础设施设置,您只需为所处理的数据付费。
要实施此解决方案,首先需配置AWSWAF日志记录以捕获网页流量信息。该过程记录了访问控制列表(ACL)所分析流量的详细信息。每个日志条目包含请求时间戳、详细请求信息以及触发的规则匹配结果。这些日志被实时持续地流向Firehose。
Firehose将这些日志写入存储在Amazon S3中的Apache Iceberg表。当Firehose向S3表传输数据时,它使用来存储和管理表元数据。此元数据包含模式信息、分区细节和文件位置,方便在AWS分析服务中无缝发现和查询数据。
最后,安全团队可以利用多种AWS服务分析Apache Iceberg表中的数据,包括、、和。在本演示中,我们使用Athena对安全日志运行SQL查询。
下图展示了解决方案架构。
删除)
方案实施包括四个步骤:
您可以使用CloudFormation模板,在美国东部(弗吉尼亚北部)AWS区域将所需资源部署到您的AWS环境中。该模板会创建一个用于存储AWSWAF日志的S3存储桶、一个用于Apache Iceberg表的AWSGlue数据库,以及用于此解决方案所需的 (IAM) 角色和策略。
在开始之前,请确保您具备以下前提条件:
如果您尚未设置AWS WAF,您可以参考来创建一个带有AWS WAF的示例Web应用程序。
AWS WAF日志使用区分大小写的字段名(如httpRequest
和webaclId
)。为了成功进行日志摄取,该解决方案通过AWSGlue作业使用Apache Iceberg API创建表,这是保留AWS WAF日志字段名的可靠方法。虽然AWS Glue爬虫和AthenaDDL提供了方便的方式来创建Apache Iceberg表,但它们会将混合大小写的列名转换为小写,这可能会影响AWS WAF日志处理。通过使用AWSGlue作业与Apache Iceberg API,可以保留列名的大小写,确保AWS WAF日志字段与表列之间的正确映射。
完成以下步骤以使用AWS CloudFormation部署解决方案资源:
WAF-Firehose-Iceberg-Stack
。此堆栈需要几分钟来部署。部署完成后,您可以通过导航到CloudFormation堆栈的Resources 选项卡查看创建的资源。 删除)
在设置Firehose交付流之前,您必须在数据目录中创建目标Apache Iceberg表。这将通过AWS Glue作业和Apache IcebergAPI完成,如前所述。完成以下步骤以创建Apache Iceberg表:
删除)
删除)
WAF-Firehose-Iceberg-Stack-GlueServiceRole-*
。
删除)sql.catalog.glue_catalog.warehouse
更新为CloudFormation模板创建的S3桶。python %%configure { "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://<S3BucketName>/waflogdata --conf spark.sql.catalog.glue_catalog.catalog- impl=org.apache.iceberg.aws.glue.GlueCatalog --confspark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO", " --datalake-formats": "iceberg" }
https://docs.aws.amazon.com/glue/latest/dg/release-notes.html #
更新:将下面的%glue_version参数更改为最新版本
%idle_timeout 2880 %glue_version 5.0 %worker_type G.1X %number_of_workers 5
import sys from awsglue.transforms import * from awsglue.utils importgetResolvedOptions from pyspark.context import SparkContext fromawsglue.context import GlueContext from awsglue.job import Job frompyspark.conf import SparkConf
sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark =
glueContext.spark_session job = Job(glueContext)
spark.sql(""" CREATE TABLE glue_catalog.waf_logs_db.firehose_waf_logs(
`timestamp` bigint, `formatVersion` int, `webaclId` string,
`terminatingRuleId` string, `terminatingRuleType` string, `action` string,
`terminatingRuleMatchDetails` array < struct < conditiontype: string,
sensitivitylevel: string, location: string, matcheddata: array < string > > >,
`httpSourceName` string, `httpSourceId` string, `ruleGroupList` array < struct
< rulegroupid: string, terminatingrule: struct < ruleid: string, action:
string, rulematchdetails: array < struct < conditiontype: string,
sensitivitylevel: string, location: string, matcheddata: array < string > > >
>, nonterminatingmatchingrules: array < struct < ruleid: string, action:
string, overriddenaction: string, rulematchdetails: array < struct <
conditiontype: string, sensitivitylevel: string, location: string,
matcheddata: array < string > > >, challengeresponse: struct < responsecode:
string, solvetimestamp: string >, captcharesponse: struct < responsecode:
string, solvetimestamp: string > > >, excludedrules: string > >,
`rateBasedRuleList` array < struct < ratebasedruleid: string, limitkey:
string, maxrateallowed: int > >, `nonTerminatingMatchingRules` array < struct
< ruleid: string, action: string, rulematchdetails: array < struct <
conditiontype: string, sensitivitylevel: string, location: string,
matcheddata: array < string > > >, challengeresponse: struct < responsecode:
string, solvetimestamp: string >, captcharesponse: struct < responsecode:
string, solvetimestamp: string > > >, `requestHeadersInserted` array < struct
< name: string, value: string > >, `responseCodeSent` string, `httpRequest`
struct < clientip: string, country: string, headers: array < struct < name:
string, value: string > >, uri: string, args: string, httpversion: string,
httpmethod: string, requestid: string >, `labels` array < struct < name:
string > >, `CaptchaResponse` struct < responsecode: string, solvetimestamp:
string, failureReason: string >, `ChallengeResponse` struct < responsecode:
string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint`
string, `overSizeFields` string, `requestBodySize` int,
`requestBodySizeInspectedByWAF` int ) USING iceberg TBLPROPERTIES ("format-
version"="2") """) job.commit() ```
删除)
## 创建Firehose流
完成以下步骤以创建Firehose流:
删除)
删除)
删除)
4. 在 **Destination settings** 部分,启用 **Inline parsing for routing information** 。由于我们将所有记录发送到一个表,请指定目标数据库和表的名称:
5. 对于数据库表达式,输入 `"waf_logs
Leave a Reply