英文:
Does Apache Beam Elastic IO module support updates to existing documents by Id?
问题
Apache Beam是否支持Elasticsearch文档的更新,我无法找到相关文档。
英文:
I could not find any documentation for updates of documents on elastic search with apache beam.
Does apache beam support updates for elastic search?
答案1
得分: 1
使用Beam
Java
,您可以使用ElasticsearchIO。
将PCollection
写入ElasticSearch
的示例:
pipeline
.apply(...)
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
)
重要澄清:此IO不支持update
类型操作,write
方法允许在Elastic
中添加元素。
如果您必须执行更新操作,有三种解决方案:
- 使用
write
和带有重复数据的现有IO,并在Beam之外的分离进程中删除重复项,仅保留Elastic
中的最后一个元素。例如,您可以使用DAG
编排器,在Dataflow
作业之后执行一个服务。 - 使用现有的
Elastic
客户端和update
方法,在ParDo
中使用它。 - 基于具有更新方法的现有客户端开发自己的IO。
此IO在Beam
Python
中不存在。
您可以查看此链接以获取Beam
中现有IO的列表。
英文:
With Beam
Java
, you can use ElasticsearchIO.
Example to write a PCollection
to ElasticSearch
:
pipeline
.apply(...)
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type")
)
Important clarification, the IO does not support update
type operations, the write
method allows adding an element in Elastic
.
If it's mandatory for you to apply update operations, you have 3 solutions :
- Use the write and existing IO with duplicates, and in a separated process outise of Beam, remove duplicate and keep only the last element in
Elastic
. For example you can use aDAG
orchestrator and execute a service after theDataflow
job - Use an existing
Elastic
client
withupdate
method and use it in aParDo
- Develop your own IO based on an existing client with the update method
This IO doesn't exists natively for Beam
Python
.
You can check this link to have the list of existing IO in Beam
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论