如何在 Flink 中使用 TestContainers 和 Cassandra sink。

huangapple go评论59阅读模式
英文:

How to use Cassandra sink with TestContainers in Flink

问题

你的测试代码中出现了一些问题,导致了错误。问题主要集中在以下几个方面:

  1. NullPointerException异常:在你的测试中,出现了一个java.lang.NullPointerException异常,这通常表示某个对象没有被正确初始化或者是空的。具体来说,在你的Cassandra Sink关闭时抛出了这个异常。这可能与Cassandra连接配置或者资源清理有关。

  2. Cassandra连接问题:日志中显示了一个com.datastax.driver.core.exceptions.NoHostAvailableException异常,说明在执行Cassandra查询时没有可用的主机。这可能是因为你指定的Cassandra地址无法连接或者Cassandra服务没有启动。

  3. Cassandra容器配置:你在测试中使用了TestContainers来运行Cassandra容器,但需要确保Cassandra容器正确配置并且正在运行。此外,你还需要检查你的Cassandra连接配置,确保它与Cassandra容器的地址和端口匹配。

  4. Cassandra初始化脚本:你的Cassandra容器配置包括一个初始化脚本testInit3.cql,需要确保这个脚本正确初始化了Cassandra数据库,以便你的测试可以正常运行。

要解决这些问题,你可以按照以下步骤操作:

  • 确保Cassandra容器已经正确配置并且正在运行。
  • 验证你的Cassandra连接配置,确保它与Cassandra容器的地址和端口匹配。
  • 检查你的初始化脚本testInit3.cql,确保它正确初始化了Cassandra数据库。
  • 可能需要查看Cassandra Sink的文档以确保你正确使用了它。

这些步骤应该能帮助你解决测试中出现的问题。如果问题仍然存在,你可能需要更详细地检查你的代码和配置,以找出根本原因。

英文:

Im trying to test Cassandra Sink with use of TestContainers in a simple Flink pipeline which use DataStreamTestBase for tests:

public class CassandraPojoSinkExampleTest extends DataStreamTestBase {

    @Rule
    public CassandraContainer cassandra = new CassandraContainer<>().withInitScript("testInit3.cql")
            .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig()
                    .withCpuCount(2L));

    @Override
    public void initialize() throws Exception {
        super.initialize();
        testEnv.getConfig().disableObjectReuse();
        testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        testEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.of(1, TimeUnit.SECONDS)));
    }

    @Test
    public void test() throws Throwable {
       ArrayList<Message> messages = new ArrayList<>(20);

            for (long i = 0; i < 20; i++) {
                messages.add(new Message("cassandra-" + i));
            }


        Cluster cluster = cassandra.getCluster();

        try (Session session = cluster.connect()) {

            CassandraPojoSinkExample cassandraPojoSinkExample= new CassandraPojoSinkExample();
            cassandraPojoSinkExample.doIt(cassandra.getHost(), testEnv.fromCollection(messages));

            MappingManager manager= new MappingManager(session);
            Mapper<Message> mapper=manager.mapper(Message.class);
            String word ="cassandra-1";
            Message message= mapper.get(word);
            System.out.println(message);
        }
    }

Flink pipeline looks like (its only sink in doIt method):

public class CassandraPojoSinkExample {
    private static final ArrayList<Message> messages = new ArrayList<>(20);

    static {
        for (long i = 0; i < 20; i++) {
            messages.add(new Message("cassandra-" + i));
        }
    }
    public static void main(String[] args) throws Exception {

        // the main method is not used in tests

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Message> source = env.fromCollection(messages);

        CassandraPojoSinkExample cassandraPojoSinkExample=new CassandraPojoSinkExample();
        cassandraPojoSinkExample.doIt("127.0.0.1", source);
    }
    public void doIt(String address, DataStreamSource<Message> source) throws Exception {

        CassandraSink.addSink(source)
                .setClusterBuilder(new ClusterBuilder() {
                    @Override
                    protected Cluster buildCluster(Builder builder) {
                        return builder.addContactPoint(address).build();
                    }
                })
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();
    }

When i run the test i got error:

16:41:57,675 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.NullPointerException
	at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.checkAsyncErrors(CassandraSinkBase.java:162)
	at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:96)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
	at java.lang.Thread.run(Thread.java:748)
16:41:57,683 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Collection Source -> Sink: Cassandra Sink (1/1) (721a02e3c1345d640bf03b67eff0aba7) switched from RUNNING to FAILED.
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Cannot connect))
	at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
	at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
	at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
	at com.datastax.driver.core.Cluster.init(Cluster.java:162)
	at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
	at com.datastax.driver.core.Cluster.connect(Cluster.java:283)
	at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.createSession(CassandraPojoSink.java:123)
	at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
	at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:106)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
	at java.lang.Thread.run(Thread.java:748)

What am I doing wrong here?

答案1

得分: 1

从上面的堆栈跟踪中,com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 似乎Cassandra主机不可用。

我会建议您将端口暴露给外部:

    @Rule
    public CassandraContainer cassandra = new CassandraContainer<>().withInitScript("testInit3.cql")
            .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig()
            .withCpuCount(2L))
            .withExposedPorts(9042);

另外,我会通过使用 telnet 手动验证相应的主机/端口,以确保服务已启动。

英文:

From the stacktrace above, com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 it seems that the cassandra hosts are not available.

I would say you need to expose the ports to outside :

    @Rule
    public CassandraContainer cassandra = new CassandraContainer&lt;&gt;().withInitScript(&quot;testInit3.cql&quot;)
            .withCreateContainerCmdModifier(cmd -&gt; cmd.getHostConfig()
            .withCpuCount(2L))
            .withExposedPorts(9042);

Also, I would ensure manually via telnet to the corresponding host/port to make sure that service started.

答案2

得分: 1

@Mikalai Lushchytski的回答是好的,问题在于暴露的端口。

需要将暴露的端口传递给addSink方法中的.setHost()

另外,老实说,我之前已经尝试过这个,但没有注意到它起作用,因为问题在于从Cassandra数据库表中接收信息。在提供的代码中,在测试期间我无法从数据库中检索任何行,问题似乎出在DataStreamTestBase上以及方法调用的顺序上。我通过手动进入数据库并在进行中的测试中提取数据来确认了流水线的工作情况。

正确从Cassandra TestContainer中获取数据库记录并使用DataStreamTestBase的解决方案是使用重写的executeTest()方法:

public class CassandraPojoSinkExampleTest extends DataStreamTestBase {
    private static boolean runTestAfterMethod = true;

    @Rule
    public CassandraContainer cassandra = new CassandraContainer<>()
            .withInitScript("testInit3.cql")
            .withExposedPorts(9042);

    @Before
    public void setUp() {
        runTestAfterMethod = true;
    }

    @Override
    public void executeTest() throws Throwable {
        if (runTestAfterMethod) {
            super.executeTest();
        }
    }

    @Override
    public void initialize() throws Exception {
        super.initialize();
        testEnv.getConfig().disableObjectReuse();
        testEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.of(1, TimeUnit.SECONDS)));
    }

    @Test
    public void test() throws Throwable {
        runTestAfterMethod = false;

        ArrayList<Message> messages = new ArrayList<>(20);
        for (long i = 0; i < 20; i++) {
            messages.add(new Message("cassandra-" + i));
        }

        DataStreamSource<Message> source = testEnv.fromCollection(messages);

        CassandraPojoSinkExample cassandraPojoSinkExample = new CassandraPojoSinkExample();
        cassandraPojoSinkExample.doIt(cassandra.getHost(), cassandra.getMappedPort(9042), source);

        try (Session session = cassandra.getCluster().newSession()) {
            testEnv.executeTest();     // 重要的事情

            ResultSet resultSet = session.execute("select * from test.features;");
            List<Row> rows = resultSet.all();
            List<String> actualRecords = new ArrayList<String>();

            if (rows.size() != 0) {
                for (int i = 0; i < 20; i++) {
                    actualRecords.add(rows.get(0).getString(i));
                }
            }

            List<String> expectedRecords = new ArrayList<String>() {
                {
                    for (int i = 0; i < 20; i++) {
                        add("cassandra-" + i);
                    }
                }
            };

            assertTrue("List equality without order",
                    actualRecords.containsAll(expectedRecords) && actualRecords.containsAll(expectedRecords));
        }
    }
}

通过这种方式,我们确保resultSet不会为空,并且测试将按正确的顺序执行。

英文:

Answer from @Mikalai Lushchytski was good, the problem was in exposed port.

The exposed port need to be passed to .setHost() in addSink method also.

Additionally, to be honest i already try this before but didn't notice it's working because the problem was in receiving information from Cassandra database table. In provided code i wasnt able to retrieve any row from the database during the test and the problem seems to be with DataStreamTestBase and the order in which the methods are called. I confirmed working of the pipeline by manually entering the database and extracting the data within ongoing test.

The solution to properly get database records from Cassandra TestContainer with use of DataStreamTestBase
is to use Overrided executeTest() method:

public class CassandraPojoSinkExampleTest extends DataStreamTestBase {
    private static boolean runTestAfterMethod = true;

    @Rule
    public CassandraContainer cassandra = new CassandraContainer&lt;&gt;()
			.withInitScript(&quot;testInit3.cql&quot;)
			.withExposedPorts(9042);

    @Before
    public void setUp() {
        runTestAfterMethod = true;
    }

    @Override
    public void executeTest() throws Throwable {
        if (runTestAfterMethod) {
            super.executeTest();
        }
    }

    @Override
    public void initialize() throws Exception {
        super.initialize();
        testEnv.getConfig().disableObjectReuse();
        testEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.of(1, TimeUnit.SECONDS)));
    }

    @Test
    public void test() throws Throwable {
        runTestAfterMethod = false;

        ArrayList&lt;Message&gt; messages = new ArrayList&lt;&gt;(20);
        for (long i = 0; i &lt; 20; i++) {
            messages.add(new Message(&quot;cassandra-&quot; + i));
        }

        DataStreamSource&lt;Message&gt; source = testEnv.fromCollection(messages);

        CassandraPojoSinkExample cassandraPojoSinkExample = new CassandraPojoSinkExample();
        cassandraPojoSinkExample.doIt(cassandra.getHost(), cassandra.getMappedPort(9042), source);

    try( Session session = cassandra.getCluster().newSession()) {
        testEnv.executeTest();     // Crucial thing

        ResultSet resultSet = session.execute(&quot;select * from test.features;&quot;);
        List&lt;Row&gt; rows = resultSet.all();
        List&lt;String&gt; actualRecords = new ArrayList&lt;String&gt;();

             if (rows.size() != 0) {
                for (int i = 0; i &lt; 20; i++) {
                    actualRecords.add(rows.get(0).getString(i));
                }
            }

            List&lt;String&gt; expectedRecords = new ArrayList&lt;String&gt;() {
                {
                    for (int i = 0; i &lt; 20; i++) {
                        add(&quot;cassandra-&quot; + i);
                    }
                }
            };

            assertTrue(&quot;List equality without order&quot;,
                    actualRecords.containsAll(expectedRecords)&amp;&amp; actualRecords.containsAll(expectedRecords));
        }
    }
    }

}

In that way we are sure that resultSet cannot be empty, and tests will be executed in correct order.

huangapple
  • 本文由 发表于 2020年8月31日 23:08:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/63673374.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定