使用 AWS Lambda 作为处理器为

使用 AWS Lambda 生成数据的向量嵌入,以处理 Amazon OpenSearch 进程

作者:Jagadish Kumar、Sam Selvan 和 Srikanth Govindarajan
发布日期:2025 年 1 月 21 日
阅读更多关于 :[高级 (300)](https://aws.amazon.com/blogs/big- data/category/learning-levels/advanced-300/ "查看所有高级 (300) 文章")、、、

关键要点

  • 凭借 AWS Lambda 处理器,OpenSearch Ingestion 提供了更多灵活性来丰富和转换日志、指标和追踪数据。
  • 本文展示了如何使用 Lambda 处理器生成向量嵌入并将其输入到 OpenSearch Serverless 向量集合中。
  • 使用无服务器架构,AWS 管理基础设施,简化了操作。

在 2024 年 11 月 22 日,Amazon OpenSearch Ingestion 对 AWS Lambda 处理器的支持。借助这项功能,您可以在 OpenSearch Ingestion管道中更灵活地丰富和转换日志、指标和跟踪数据。例如,您可以使用基础模型(FMs)为数据生成向量嵌入,或者查找外部数据源(如 )来增强数据。

是一个完全托管的无服务器数据管道,能够将实时日志、指标和追踪数据传递到 域和 集合。

是 OpenSearch Ingestion管道中的组件,它可以让您在将记录发布到目标之前,使用所需格式过滤、转换和丰富事件。如果在管道配置中没有定义处理器,则事件会按照源组件指定的格式发布。您可以在单个管道中包含多个处理器,它们会按照管道配置中定义的顺序依次运行。

OpenSearch Ingestion 允许将 Lambda函数作为处理器与内置的本地处理器一起使用,以便在转换数据时进行优化。您可以根据事件数量或大小将事件批量处理为单个有效载荷,然后再调用 Lambda,以提高管道性能和降低成本。Lambda使您能够在无需配置或管理服务器的情况下运行代码,消除了创建工作负载感知的集群扩展逻辑、维护事件集成或管理运行时的需求。

本文展示了如何使用 OpenSearch Ingestion 的 Lambda 处理器为您的源数据生成嵌入,并将其注入到 中。该解决方案利用 OpenSearch Ingestion 管道与 Lambda 处理器的灵活性,动态生成嵌入。Lambda函数将调用托管在 中的 ,实现高效且可扩展的嵌入创建。这种架构简化了诸如推荐引擎、个性化聊天机器人和欺诈检测系统等多种用例。

将 OpenSearch Ingestion、Lambda 和 OpenSearch Serverless集成在一起,创建了一个完全无服务器的嵌入生成和搜索管道。这种组合提供了自动扩展以匹配工作负载需求和基于使用的模型。操作简化,因为 AWS管理基础设施、更新和维护。这种无服务器的方法使您能够专注于开发搜索和分析解决方案,而不是管理基础设施。

请注意,Amazon OpenSearch Service 还提供了 ,可以将文本转换为向量,并在摄取数据和搜索时促进向量搜索。在摄取过程中,神经搜索将文档文本转换为向量嵌入,并在向量索引中索引文本及其向量嵌入。神经搜索仅适用于运行 2.9 及以上版本的托管集群,并且。

解决方案概述

该解决方案在 (Amazon S3) 中的一个数据集中构建嵌入。我们使用 Lambda 函数在 OpenSearch Ingestion 提供的有效载荷上调用 Amazon Titan 模型。

先决条件

您应拥有适当的角色权限,以调用您的 Lambda 函数和 Amazon Bedrock 模型,并向 OpenSearch Serverless集合写入数据。

为提供对集合的访问,您必须配置 (IAM) 管道角色,并授予对集合的访问权限策略。有关详细信息,请参阅 。以下是示例代码:

json { "Version": "2012-10-17", "Statement": ,包含电影信息,例如 originalTitleruntimeMinutes` 和类型。

OpenSearch Ingestion 管道利用 Lambda 处理器为字段 original_title 创建嵌入,并将嵌入存储为 original_title_embeddings,以及其他数据。

请查看以下管道代码:

提供请求 SQS 和 S3 的角色 sts_role_arn: "<>" scan: buckets: \- bucket: name:
"lambdaprocessorblog"

processor: \- aws_lambda: function_name: "generate_embeddings_bedrock"
response_events_match: true tags_on_failure: ["lambda_failure"] batch:
key_name: "documents" threshold: event_count: 4 aws: region: us-west-2sts_role_arn: "<>" sink: \- opensearch: hosts: \-
'https://myserverlesscollection.us-region.aoss.amazonaws.com' index: imdb-
data-embeddings aws: sts_role_arn: "<>" region: us-west-2 serverless: true ```

让我们仔细看看摄取管道中的 Lambda 处理器。请注意 `key_name` 参数。您可以为 key_name 选择任何值,您的 Lambda函数需要在处理来自 OpenSearch Ingestion 的有效载荷时引用此键。有效载荷的大小由批处理设置决定。当在 Lambda处理器中启用批处理时,OpenSearch Ingestion 会在调用 Lambda函数之前将多个事件分组为一个有效载荷。当满足以下任一阈值时,批量将发送到 Lambda:

  * **event_count** – 事件数量达到指定限制。
  * **maximum_size** – 批量的总大小达到指定大小(例如,5MB),可配置到 6MB(AWS Lambda 的调用有效载荷限制)上限。

### Lambda 函数

Lambda 函数从 OpenSearch Ingestion 接收数据,调用 Amazon Bedrock生成嵌入,并将其添加到源记录中。“documents” 用于引用来自 OpenSearch Ingestion 的事件,并与管道中声明的
`key_name` 匹配。我们将来自 Amazon Bedrock 的嵌入值附加回原始记录。然后,包含附加嵌入值的新记录会通过 OpenSearchIngestion 发送到 OpenSearch Serverless 的接收端。请参见以下代码:

```python import json import boto3 import os

# 初始化 Bedrock 客户端

bedrock = boto3.client('bedrock-runtime')

def generate_embedding(text): """使用 Bedrock 为给定文本生成嵌入。""" response =
bedrock.invoke_model( modelId="amazon.titan-embed-text-v1",
contentType="application/json", accept="application/json",
body=json.dumps({"inputText": text}) ) embedding =
json.loads(response['body'].read())['embedding'] return embedding

def lambda_handler(event, context): # 假定输入是 JSON 文档的列表 documents =
event['documents']

    
    
    processed_documents = []
    
    for doc in documents:
        if 'originalTitle' in doc:
            # 为 'originalTitle' 字段生成嵌入
            embedding = generate_embedding(doc['originalTitle'])
    
            # 将嵌入添加到文档中
            doc['originalTitle_embeddings'] = embedding
    
        processed_documents.append(doc)
    
    # 返回处理后的文档
    return processed_documents
    

在使用 lambda处理器时,如果出现任何异常,批处理中的所有文档都将被视为失败事件,并转发到下一个处理器链(如果有)或带有失败标签的接收端。该标签可以通过 tags_on_failure 参数配置到管道,错误信息也会发送到 CloudWatch 日志进行进一步处理。

管道运行后,您可以查看到嵌入已创建并存储为文档中的 originalTitle_embeddings,并在 imdb- data-embeddings 中找到。以下截图展示了一个示例。

删除)

总结

在本文中,我们展示了如何将 Lambda 用作 OpenSearch Ingestion管道的一部分,以实现复杂的数据转换和丰富。如需了解有关该功能的更多详细信息,请参见 。


关于作者

![Jagadish删除) Jagadish Kumar (Jag) 是 AWS 的高级专家解决方案架构师,专注于 Amazon OpenSearchService。他对数据架构充满热情,帮助客户在 AWS 上构建可扩展的分析解决方案。

![SamSelvan](https://d2908q01vomqb2.cloudfront.net/b6692ea5df920cad691c20319a6fffd7a4a766b8/2020/09/09/sam- 删除) Sam Selvan 是 Amazon OpenSearch Service 的首席专家解决方案架构师。

![Srikanth删除) Srikanth Govindarajan 是 Amazon OpenSearch Service的软件开发工程师。他对架构基础设施和构建可扩展的搜索、分析、安全性、人工智能和机器学习基础用例解决方案充满热情。

Leave a Reply

Required fields are marked *