英文:
Real time process dynamoDB table with AWS Lambda
问题
我目前正在处理来自IoT规则的数据流,将其输入到DynamoDB表中。
我需要重新排列数据以进行预测。对于每个新的数据输入,每一行新数据我都会触发一个Lambda函数,但我需要获取最近的96个表行进行处理。
我的问题是:
1- 我如何查询表格
2- 并将其转换为类似pandas数据框的东西?
在表格中,我有一个时间戳列(格式尚未决定)。
英文:
I am currently working on a stream of data that comes from an IoT rule and inputs the readings to a dynamoDB table.
From that I need to rearrange the data to make predictions. For every new data entry, every new row I trigger a lambda function, but I need to get the last 96 table rows to manipulate.
My problem is
1- how can I query the table
2- and transform to something familiar like a pandas dataframe?
In the table I have a timestamp column (the format is still open to be decided)
答案1
得分: 2
当DynamoDB Streams调用您的Lambda时,您可以使用Query
或Scan
从DynamoDB检索数据。
在这里,我的假设是您需要根据时间戳检索最近的96次更新,Scan
不会让您以高效的方式执行此操作。您需要使用Query
,不过这将决定您的数据模型。
根据您具体的用例需求,我建议创建一个带有静态分区键(例如GSI_PK=1
)和时间戳作为排序键的全局二级索引。
gsi_pk | gsi_sk | 数据 |
---|---|---|
1 | 2023-01-28T07:25:40.824Z | 数据 |
1 | 2023-02-28T07:25:40.824Z | 数据 |
1 | 2023-03-28T07:25:40.824Z | 数据 |
1 | 2023-04-28T07:25:40.824Z | 数据 |
现在,您可以通过Query
您的全局二级索引,确保返回的是最近的96项。请注意,全局二级索引是最终一致性的,要注意这一点。
response = table.query(
IndexName='my-index',
KeyConditionExpression=Key('gsi_pk').eq('1'),
Limit=96,
ScanIndexForward=False
)
英文:
Ultimately it depends on how your data is structured and how you want to access it. But to answer your question, when DynamoDB Streams invokes your Lambda you can use a Query
or Scan
to retrieve the data from DynamoDB.
My assumption here is that you need to retrieve the last 96 updates based on timestamp, Scan
will not allow you to do that efficiently. You would need to use Query
however, that will dictate your data model.
Depending on your specific use-case needs, I would create a Global Secondary Index with a static partition key such as GSI_PK=1
and your timestamp as the sort key
gsi_pk | gsi_sk | data |
---|---|---|
1 | 2023-01-28T07:25:40.824Z | data |
1 | 2023-02-28T07:25:40.824Z | data |
1 | 2023-03-28T07:25:40.824Z | data |
1 | 2023-04-28T07:25:40.824Z | data |
Now you can Query
your GSI and be sure you are being returned the last 96 items. GSI's are eventually consistent so be aware of that.
response = table.query(
IndexName='my-index',
KeyConditionExpression=Key('gsi_pk').eq('1'),
Limit=96,
ScanIndexForward=False
)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论