英文:
Does ElasticsearchIO for apache-beam java supports Templating and ValueProvider argument? Error While invoking templates
问题
我试图创建一个用于将数据索引到Elasticsearch的Apache Beam模板。模板已创建,但在调用模板时,管道失败并显示“无协议错误”。这看起来非常奇怪,因为错误与URL对象相关。
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface IndexToEsOptions extends PipelineOptions {
@Description("要读取的gzip索引文件的路径")
ValueProvider<String> getInputIndexFile();
void setInputIndexFile(ValueProvider<String> value);
@Description("要索引的索引名称")
ValueProvider<String> getIndexName();
void setIndexName(ValueProvider<String> value);
@Description("索引模板名称")
ValueProvider<String> getIndexTemplate();
void setIndexTemplate(ValueProvider<String> value);
@Description("ES的URI")
@Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
ValueProvider<String> getEsUri();
void setEsUri(ValueProvider<String> value);
}
public static void main(String[] args) {
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}
}
当我运行时,我收到的错误是:
java.lang.IllegalArgumentException: 无法获取Elasticsearch版本
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194)
...
Caused by:
org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: 无法获取Elasticsearch版本
...
Caused by: java.lang.IllegalArgumentException: 无法获取Elasticsearch版本
...
由于错误信息包含大量的堆栈跟踪,我只提供了相关部分。错误的主要原因似乎是由于URL对象的问题引起的,具体来说是 "java.net.MalformedURLException: no protocol"。
英文:
I was trying to create a template for Apache beam to index data to elasticsearch. The template is getting created but while invoking the template the pipeline failed with No protocol Error. It looks very odd as the error is related to the URL object.
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface IndexToEsOptions extends PipelineOptions {
@Description("Path of the gzip index file to read from")
ValueProvider<String> getInputIndexFile();
void setInputIndexFile(ValueProvider<String> value);
@Description("Index name to index with")
ValueProvider<String> getIndexName();
void setIndexName(ValueProvider<String> value);
@Description("Index template name")
ValueProvider<String> getIndexTemplate();
void setIndexTemplate(ValueProvider<String> value);
@Description("URI for es")
@Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
ValueProvider<String> getEsUri();
void setEsUri(ValueProvider<String> value);
}
public static void main(String[] args) {
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}
The error which I get when I run is
> java.lang.IllegalArgumentException: Cannot get Elasticsearch version
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834) Caused by:
> org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalArgumentException: Cannot get Elasticsearch version
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)
> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)
> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183)
> ... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get
> Elasticsearch version
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)
> Caused by: java.net.MalformedURLException: no protocol:
> RuntimeValueProvider{propertyName=esUri,
> default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}
> java.base/java.net.URL.<init>(URL.java:627)
> java.base/java.net.URL.<init>(URL.java:523)
> java.base/java.net.URL.<init>(URL.java:470)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)
> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)
> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)
答案1
得分: 1
To put it simply, no it doesn't look like ElasticsearchIO.ConnectionConfiguration
supports ValueProviders, at least not as of the current release (2.22.0). You can see this by looking at the signature for ConnectionConfiguration.Create
:
public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
java.lang.String index,
java.lang.String type)
And comparing it to a function that does support ValueProviders, ElasticsearchIO.Read.withQuery
:
public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)
To support ValueProviders, a function must actually accept a ValueProvider object. This is because the ValueProvider is intended to communicate the parameter during runtime, not during pipeline construction time. So during pipeline construction, it must be passed as a ValueProvider object everywhere.
In your example, what's happening is that you are calling toString
on your ValueProvider for EsUri
, and instead of resulting in a string containing your URL, you're getting a string representation of your ValueProvider that looks like this: "RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}
. That's why you're getting a MalformedURLException
. It's attempting to read that string as a URL which fails.
The solution is simple though, you just need to change the EsUri
parameter to a construction time parameter by changing it from ValueProvider<String>
to String
. Just be aware that using it as a construction time parameter means you will need to rebuild the pipeline each time you want to change that parameter. Unfortunately there's nothing you can do about that until ValueProvider support is added.
英文:
To put it simply, no it doesn't look like ElasticsearchIO.ConnectionConfiguration
supports ValueProviders, at least not as of the current release (2.22.0). You can see this by looking at the signature for ConnectionConfiguration.Create
:
public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
java.lang.String index,
java.lang.String type)
And comparing it to a function that does support ValueProviders, ElasticsearchIO.Read.withQuery
:
public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)
To support ValueProviders, a function must actually accept a ValueProvider object. This is because the ValueProvider is intended to communicate the parameter during runtime, not during pipeline construction time. So during pipeline construction, it must be passed as a ValueProvider object everywhere.
In your example, what's happening is that you are calling toString
on your ValueProvider for EsUri
, and instead of resulting in a string containing your URL, you're getting a string representation of your ValueProvider that looks like this: "RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}
. That's why you're getting a MalformedURLException
. It's attempting to read that string as a URL which fails.
The solution is simple though, you just need to change the EsUri
parameter to a construction time parameter by changing it from ValueProvider<String>
to String
. Just be aware that using it as a construction time parameter means you will need to rebuild the pipeline each time you want to change that parameter. Unfortunately there's nothing you can do about that until ValueProvider support is added.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论