s3 转移到 AWS Kinesis 到 OpenSearch

huangapple go评论55阅读模式
英文:

s3 to aws kenesis to opensearch

问题

我有一个包含大约1000万个对象的S3存储桶,所有这些对象都需要进行处理并发送到OpenSearch。为此,我正在评估是否可以使用Kinesis来完成此任务。

在线上的解决方案似乎暗示要使用Lambda,但考虑到我有1000万个对象,我认为函数会在for循环用尽之前超时。

因此,我想要的设置是:

s3 --> (某个生产者) --> Kinesis 数据流 --> OpenSearch(目的地)

请问最佳操作方式是什么?

英文:

I have a s3 bucket with some 10M objects which all needs to be processed and sent to opensearch. To this end I am evaluating if kenisis can be used for this.

The solutions online seem to imply using lambda, but since I have 10M objects, I would think the function will timeout by the time the for loop is exhausted.

So, the setup I would like is:

s3 --> (some producer) --> kenesis data streams --> opensearch (destination)

What would be the optimal way to go about this please

答案1

得分: 2

Mark B的答案绝对是一个可行的选项,我建议配置您的SQS队列以触发Lambda以处理每条消息。

除非您需要Kinesis进行某些ETL功能,否则您可以直接从S3到OpenSearch。

假设S3中的文档适合OpenSearch,我会采取以下一种方式:

  1. AWS Step Functions具有一个内置模式来处理S3中的项目。这将迭代选择的存储桶(或文件夹等)中与您的描述匹配的所有对象。然后,可以将每个对象发送到Lambda函数以将其内容保存到OpenSearch中。
    • 假设您有一些ETL或格式要求,这在Lambda中很容易实现。
    • 我找不到SFN S3模式的任何文档,但它们在Workflow Studio中可用,请参阅此截图
  2. 如果您熟悉Python,AWS SDK for Pandas(以前称为AWS Data Wrangler)是一个非常简单的选项。我已经广泛使用它来轻松将数据从CSV、S3和其他位置移动到OpenSearch中。

使用AWS SDK for Pandas,您可以像这样实现您要查找的内容...

import awswrangler as wr
from opensearchpy import OpenSearch

items = wr.s3.read_json(path="s3://my-bucket/my-folder/")

# 连接+上传到OpenSearch
my_client = OpenSearch(...)
wr.opensearch.index_df(client=my_client, df=items)

AWS SDK for Pandas可以迭代S3项目的块,并且有一个关于从S3到OpenSearch索引JSON(和其他文件类型)的教程

英文:

Mark B's answer is definitely a viable option, and I'd suggest configuring your SQS queue to trigger Lambda for each message.

Unless you need Kinesis for some ETL functionality, it's likely that you can go from S3 to OpenSearch directly.

Assuming the docs in S3 are formatted suitably for OpenSearch, I would take one of the following approaches:

  1. AWS Step Functions has a built-in pattern to process items in S3. This would iterate over all the objects in a chosen bucket (or folder, etc.) that match your description. Each object could then be sent to a Lambda function to save its contents to OpenSearch.
    • Assuming you have some ETL or formatting requirements, this would be easy to implement in Lambda.
    • I can't find any documentation for the SFN S3 Patterns, but they're available in Workflow Studio, see this screenshot.
  2. If you're comfortable with Python, the AWS SDK for Pandas (previously AWS Data Wrangler) is a super easy option. I've used it extensively for moving data from CSVs, S3, and other locations into OpenSearch with ease.

Using the AWS SDK for Pandas, you might achieve what you're looking for like this...

import awswrangler as wr
from opensearchpy import OpenSearch

items = wr.s3.read_json(path="s3://my-bucket/my-folder/")

# connect + upload to OpenSearch
my_client = OpenSearch(...)
wr.opensearch.index_df(client=my_client, df=items)

The AWS SDK for Pandas can iterate over chunks of S3 items, and there's a tutorial on indexing JSON (and other file types) from S3 to OpenSearch.

答案2

得分: 1

这是一篇关于此主题的官方博客文章,建议使用DMS服务将S3中的现有文件发送到Kinesis。

至于使用Lambda的所有建议,那些建议适用于文件尚未在S3中的情况,每次上传文件到S3时都会触发Lambda,只处理一个文件。没有人建议您在单个函数调用中使用Lambda来处理1000万个现有的S3文件。

如果您想要在当前情况下使用Lambda,您可以首先创建所有S3对象的列表,然后编写一个一次性脚本,将该对象列表输入到SQS队列中。然后,您可以有一个Lambda函数,随着时间的推移处理队列中的消息,从队列中获取对象键,从S3中读取文件,然后将其发送到Kinesis。Lambda可以以每次最多处理10个的批量方式处理这些文件。然后,您可以配置S3存储桶以将新对象通知发送到相同的SQS队列,Lambda将自动处理您稍后添加到存储桶的任何新对象。

英文:

Here's an official blog post on the subject that suggests using the DMS service to send existing files in S3 to Kinesis.

As for all the recommendations to use Lambda, those recommendations would be for the scenario where the files aren't in S3 yet, and Lambda would be triggered each time a file is uploaded to S3, processing just that one file. Nobody is recommending you use Lambda to process 10M existing S3 files in a single function invocation.

If you wanted to use Lambda for your current scenario, you could first create a list of all your S3 objects, and write a one-time script that feeds that list of objects into an SQS queue. Then you could have a Lambda function that processes the messages in the queue over time, taking the object key from the queue, reading the file from S3, and sending it to Kinesis. Lambda could process those in batches of up to 10 at a time. Then you could have the S3 bucket configured to send new object notifications to the same SQS queue, and Lambda would automatically processes any new objects that you add to the bucket later.

huangapple
  • 本文由 发表于 2023年5月7日 20:04:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/76193813.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定