本地 Pubsub 模拟器无法与 Dataflow 配合使用。

huangapple go评论117阅读模式
英文:

Local Pubsub Emulator won't work with Dataflow

问题

private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {
    
    @Description("Pub/Sub topic to read messages from")
    String getTopic();
    void setTopic(String topic);
    
    @Description("Pub/Sub subscription to read messages from")
    String getSubscription();
    void setSubscription(String subscription);
            
    @Description("Local file output")
    String getOutput();
    void setOutput(String output);
}

public static void main(String[] args) {
    
    Options options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(Options.class);
    options.setStreaming(true);
    options.setPubsubRootUrl("localhost:8085");

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // other .apply's
    
    pipeline.run();
    
}

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription: 
...
Caused by: java.lang.RuntimeException: Failed to create subscription: 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost

When I try to add `http` in the line `options.setPubsubRootUrl("localhost:8085")`, I get an infinitely repeated exception:

com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.net.PlainSocketImpl.connect(Unknown Source)
    at java.net.SocksSocketImpl.connect(Unknown Source)

It seems to reach the Pubsub emulator but can't connect as the command-line where I run the emulator generates this infinitely also:

[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

How can I make my Dataflow work with Pubsub emulator?
英文:

I am developing Dataflow in Java, the input comes from a Pubsub. Later, I saw a guide here on how to use local Pubsub emulator so I would not need to deploy to GCP in order to test.

Here is my simple code:

private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {
@Description("Pub/Sub topic to read messages from")
String getTopic();
void setTopic(String topic);
@Description("Pub/Sub subscription to read messages from")
String getSubscription();
void setSubscription(String subscription);
@Description("Local file output")
String getOutput();
void setOutput(String output);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);
options.setStreaming(true);
options.setPubsubRootUrl("localhost:8085");
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// other .apply's
pipeline.run();
}

I was able to follow the guide, including the part where I need to use the example Python code to create topic, subscription, publisher and even publish messages. When I use the Python code to interact with the Pubsub emulator, I notice the message Detected HTTP/2 connection in the command-line where I run the emulator:

Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.

I compiled/run the code in Eclipse using Dataflow Pipeline Run Configuration, but I get a problem.

本地 Pubsub 模拟器无法与 Dataflow 配合使用。
本地 Pubsub 模拟器无法与 Dataflow 配合使用。
本地 Pubsub 模拟器无法与 Dataflow 配合使用。

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription: 
...
Caused by: java.lang.RuntimeException: Failed to create subscription: 
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost

When I try to add http in the line options.setPubsubRootUrl("localhost:8085"), I get an infinitely repeated exception:

com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)

It seems to reach the Pubsub emulator but can't connect as the command-line where I run the emulator generates this infinitely also:

[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

How can I make my Dataflow work with Pubsub emulator?

答案1

得分: 4

你正试图从Beam Direct Runner使用Beam 2.5 SDK的Dataflow分支连接到Pubsub模拟器。截至2019年6月6日,Dataflow 2.5 SDK和Eclipse插件已被弃用。然而,这应该可以工作。

在Beam中,你需要在PubsubRootUrl前加上 'http://',正如你已经发现的那样。你看到的第二个问题表明在 localhost:8085 上没有任何监听。这很可能是因为实际上有两个localhost:IPv4和IPv6。Pubsub模拟器仅在IPv4上进行监听,而Windows会首先尝试IPv6。尝试将 localhost 替换为 127.0.0.1 以强制使用IPv4。你应该得到以下内容:

options.setPubsubRootUrl("http://127.0.0.1:8085")
英文:

You are attempting to connect to the Pubsub emulator from the Beam Direct Runner, using the Dataflow fork of the Beam 2.5 SDK. The Dataflow 2.5 SDK and Eclipse plugin were deprecated as of June 6, 2019. However this should work.

You need to prefix your PubsubRootUrl with 'http://' in Beam, as you've discovered. The second problem you are seeing indicates that nothing is listening on localhost:8085. This is likely because there are actually 2 localhosts: IPv4 and IPv6. The Pubsub Emulator only listens on IPv4 and Windows tries IPv6 first. Try replacing localhost with 127.0.0.1 to force IPv4. You should end up with this:

options.setPubsubRootUrl("http://127.0.0.1:8085")

答案2

得分: 0

除了设置根URL之外,您还需要提供一个凭证工厂。在使用仿真器时,您不需要任何凭证。您可以通过代码(手动设置选项)或直接传递命令行来实现。后者可以保持您的代码清晰。

代码:

options.setPubsubRootUrl("http://127.0.0.1:8085");
options.setCredentialFactoryClass(NoCredentialsFactory.class);

命令行选项:

--pubsubRootUrl=http://127.0.0.1:8085
--credentialFactoryClass=ca.dataedu.dataflow.otlpdemo.NoCredentialsFactory

NoCredentialsFactory 的代码大致如下:

import com.google.auth.Credentials;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

public class NoCredentialsFactory implements CredentialFactory {

    private static final NoCredentialsFactory INSTANCE = new NoCredentialsFactory();

    public static NoCredentialsFactory fromOptions(PipelineOptions options) {
        return INSTANCE;
    }

    @Override
    public @Nullable Credentials getCredential() {
        return null;
    }
}
英文:

Besides setting the root URL, you also need to provide a credential factory. When working with the emulator, you don't need any credentials. You can do it either using the code (by setting the option manually) or just pass command line. The latter keeps your code clean.

Code:

options.setPubsubRootUrl("http://127.0.0.1:8085");
options.setCredentialFactoryClass(NoCredentialsFactory.class);

Command line options:

--pubsubRootUrl=http://127.0.0.1:8085
--credentialFactoryClass=ca.dataedu.dataflow.otlpdemo.NoCredentialsFactory

The NoCredentialsFactory code is something like:

import com.google.auth.Credentials;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;
public class NoCredentialsFactory implements CredentialFactory{
private static final NoCredentialsFactory INSTANCE = new NoCredentialsFactory();
public static NoCredentialsFactory fromOptions(PipelineOptions options) {
return INSTANCE;
}
@Override
public @Nullable Credentials getCredential() {
return null;
}
}

huangapple
  • 本文由 发表于 2020年4月10日 17:18:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/61137292.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定