Does ElasticsearchIO for apache-beam java supports Templating and ValueProvider argument? Error While invoking templates

huangapple go评论74阅读模式
英文:

Does ElasticsearchIO for apache-beam java supports Templating and ValueProvider argument? Error While invoking templates

问题

我试图创建一个用于将数据索引到Elasticsearch的Apache Beam模板。模板已创建,但在调用模板时,管道失败并显示“无协议错误”。这看起来非常奇怪,因为错误与URL对象相关。

public class StarterPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

    public interface IndexToEsOptions extends PipelineOptions {
        @Description("要读取的gzip索引文件的路径")
        ValueProvider<String> getInputIndexFile();
        void setInputIndexFile(ValueProvider<String> value);

        @Description("要索引的索引名称")
        ValueProvider<String> getIndexName();
        void setIndexName(ValueProvider<String> value);

        @Description("索引模板名称")
        ValueProvider<String> getIndexTemplate();
        void setIndexTemplate(ValueProvider<String> value);

        @Description("ES的URI")
        @Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
        ValueProvider<String> getEsUri();
        void setEsUri(ValueProvider<String> value);
    }

    public static void main(String[] args) {

        IndexToEsOptions options = PipelineOptionsFactory
                .fromArgs(args).
                        withValidation().as(IndexToEsOptions.class);

        Pipeline p = Pipeline.create(options);
        p.apply(TextIO.read().from(options.getInputIndexFile()))
                .apply(ElasticsearchIO.write().withConnectionConfiguration(
                        ElasticsearchIO.ConnectionConfiguration.create(
                                new String[]{options.getEsUri().toString()},
                                options.getIndexName().toString(),
                                options.getIndexTemplate().toString())
                                .withConnectTimeout(240)
                        )
                        .withMaxBatchSizeBytes(15 * 1024 * 1024)
                );

        p.run();
    }
}

当我运行时,我收到的错误是:

java.lang.IllegalArgumentException: 无法获取Elasticsearch版本
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194)
...
Caused by:
org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: 无法获取Elasticsearch版本
...
Caused by: java.lang.IllegalArgumentException: 无法获取Elasticsearch版本
...

由于错误信息包含大量的堆栈跟踪,我只提供了相关部分。错误的主要原因似乎是由于URL对象的问题引起的,具体来说是 "java.net.MalformedURLException: no protocol"。

英文:

I was trying to create a template for Apache beam to index data to elasticsearch. The template is getting created but while invoking the template the pipeline failed with No protocol Error. It looks very odd as the error is related to the URL object.

public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface IndexToEsOptions extends PipelineOptions {
@Description(&quot;Path of the gzip index file to read from&quot;)
ValueProvider&lt;String&gt; getInputIndexFile();
void setInputIndexFile(ValueProvider&lt;String&gt; value);
@Description(&quot;Index name to index with&quot;)
ValueProvider&lt;String&gt; getIndexName();
void setIndexName(ValueProvider&lt;String&gt; value);
@Description(&quot;Index template name&quot;)
ValueProvider&lt;String&gt; getIndexTemplate();
void setIndexTemplate(ValueProvider&lt;String&gt; value);
@Description(&quot;URI for es&quot;)
@Default.String(&quot;https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com&quot;)
ValueProvider&lt;String&gt; getEsUri();
void setEsUri(ValueProvider&lt;String&gt; value);
}
public static void main(String[] args) {
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}

The error which I get when I run is

> java.lang.IllegalArgumentException: Cannot get Elasticsearch version
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:194)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834) Caused by:
> org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalArgumentException: Cannot get Elasticsearch version
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)
> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)
> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183)
> ... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get
> Elasticsearch version
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)
> Caused by: java.net.MalformedURLException: no protocol:
> RuntimeValueProvider{propertyName=esUri,
> default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}

> java.base/java.net.URL.<init>(URL.java:627)
> java.base/java.net.URL.<init>(URL.java:523)
> java.base/java.net.URL.<init>(URL.java:470)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown
> Source)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)
> org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)
> org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)
> org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:86)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:183)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:165)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
> org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
> org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
> org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)
> org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
> org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:834)

答案1

得分: 1

To put it simply, no it doesn't look like ElasticsearchIO.ConnectionConfiguration supports ValueProviders, at least not as of the current release (2.22.0). You can see this by looking at the signature for ConnectionConfiguration.Create:

public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
                                                             java.lang.String index,
                                                             java.lang.String type)

And comparing it to a function that does support ValueProviders, ElasticsearchIO.Read.withQuery:

public ElasticsearchIO.Read withQuery(ValueProvider&lt;java.lang.String&gt; query)

To support ValueProviders, a function must actually accept a ValueProvider object. This is because the ValueProvider is intended to communicate the parameter during runtime, not during pipeline construction time. So during pipeline construction, it must be passed as a ValueProvider object everywhere.

In your example, what's happening is that you are calling toString on your ValueProvider for EsUri, and instead of resulting in a string containing your URL, you're getting a string representation of your ValueProvider that looks like this: &quot;RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}. That's why you're getting a MalformedURLException. It's attempting to read that string as a URL which fails.

The solution is simple though, you just need to change the EsUri parameter to a construction time parameter by changing it from ValueProvider&lt;String&gt; to String. Just be aware that using it as a construction time parameter means you will need to rebuild the pipeline each time you want to change that parameter. Unfortunately there's nothing you can do about that until ValueProvider support is added.

英文:

To put it simply, no it doesn't look like ElasticsearchIO.ConnectionConfiguration supports ValueProviders, at least not as of the current release (2.22.0). You can see this by looking at the signature for ConnectionConfiguration.Create:

public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
java.lang.String index,
java.lang.String type)

And comparing it to a function that does support ValueProviders, ElasticsearchIO.Read.withQuery:

public ElasticsearchIO.Read withQuery(ValueProvider&lt;java.lang.String&gt; query)

To support ValueProviders, a function must actually accept a ValueProvider object. This is because the ValueProvider is intended to communicate the parameter during runtime, not during pipeline construction time. So during pipeline construction, it must be passed as a ValueProvider object everywhere.

In your example, what's happening is that you are calling toString on your ValueProvider for EsUri, and instead of resulting in a string containing your URL, you're getting a string representation of your ValueProvider that looks like this: &quot;RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}. That's why you're getting a MalformedURLException. It's attempting to read that string as a URL which fails.

The solution is simple though, you just need to change the EsUri parameter to a construction time parameter by changing it from ValueProvider&lt;String&gt; to String. Just be aware that using it as a construction time parameter means you will need to rebuild the pipeline each time you want to change that parameter. Unfortunately there's nothing you can do about that until ValueProvider support is added.

huangapple
  • 本文由 发表于 2020年7月23日 06:43:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/63044207.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定