FlinkCDC将数据写入Elasticsearch的org.elasticsearch.common.unit.TimeValue。

huangapple go评论62阅读模式
英文:

FlinkCDC sink to elasticsearch org.elasticsearch.common.unit.TimeValue

问题

I see that you are encountering a class loading issue with Elasticsearch when trying to use the Flink Elasticsearch connector. The error message suggests that it cannot find the class org.elasticsearch.common.unit.TimeValue.

To resolve this issue, you should make sure that you have the correct version of the Elasticsearch library on your classpath. In your Maven dependencies, you have defined the version of Elasticsearch as 7.17.9. However, it seems that there may be a version mismatch or missing dependency.

Here are some steps you can take to troubleshoot and fix the issue:

  1. Check Elasticsearch Version:

    • Ensure that the Elasticsearch version you are using in your Flink application matches the version of the Elasticsearch library you have specified in your dependencies (i.e., both should be 7.17.9).
  2. Update Dependencies:

    • Double-check all your dependencies to make sure you don't have conflicting versions of Elasticsearch or its dependencies. Ensure that you have only one version of the Elasticsearch library in your classpath.
  3. Dependency Exclusions:

    • If you are using a specific version of Flink that includes Elasticsearch dependencies, consider excluding the Elasticsearch dependencies from your Flink dependencies to avoid conflicts. You can use Maven's exclusion mechanism for this purpose.
  4. Clean and Rebuild:

    • Sometimes, class loading issues can be resolved by cleaning your project and rebuilding it. Remove the target or out directory and rebuild your project.
  5. Check Elasticsearch Sink Configuration:

    • Verify that your Elasticsearch sink configuration is correct, and there are no typos or errors in specifying the Elasticsearch hosts and settings.
  6. Check for Elasticsearch Plugins:

    • If you are using any Elasticsearch plugins or custom configurations, ensure that they are compatible with the Elasticsearch version you are using.
  7. Flink Compatibility:

    • Make sure that the Flink version you are using is compatible with the Elasticsearch connector version you are using. Sometimes, certain connector versions may not work correctly with specific Flink versions.
  8. Logs and Debugging:

    • Review the logs in more detail to identify any other error messages or stack traces that might provide additional information about the issue.

By following these steps and ensuring that your dependencies are correctly configured and aligned with the Elasticsearch version you intend to use, you should be able to resolve the class loading issue and successfully use the Flink Elasticsearch connector.

英文:

**I got something wrong when i try to put the datastream sink to elasticsearch 7.17.9
**
here is my maven dependeies:

    <properties>
        <java.version>1.8</java.version>
        <elasticsearch.version>7.17.9</elasticsearch.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.16.2</flink.version>
        <flink.client.version>1.14.6</flink.client.version>
    </properties>

 <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>uk.co.jemos.podam</groupId>
            <artifactId>podam</artifactId>
            <version>7.2.11.RELEASE</version>
        </dependency>

        <!--sink function 方法开始         -->


<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-java</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-streaming-java_2.12</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-clients_2.12</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->






        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.5.0</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.9</version>
        </dependency>


        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.33</version>
        </dependency>

my key code here is :

    log.info("开始执行 flink sink to elasticsearch 程序!");
        SourceFunction<DataChangeDTO> sqlServerSource =
                SqlServerSource.<DataChangeDTO>builder()
                        .hostname(sourceConfiguration.getHostName())
                        .port(sourceConfiguration.getPort())
                        .database(sourceConfiguration.getDatabase())
                        .tableList(String.join(",", sourceConfiguration.getTableList()))
                        .username(sourceConfiguration.getUserName())
                        .password(sourceConfiguration.getPassword())
                        .deserializer(new CustomJsonDebeziumDeserializationSchema())
                        //.startupOptions(StartupOptions.initial())
                        .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //enable checkpoint
        env.enableCheckpointing(3000);
        // set the source parallelism to 2
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8080);
        HttpHost httpHost = new HttpHost(targetConfiguration.getHostName(), targetConfiguration.getPort(), "http");
        //ElasticsearchSink.Builder<DataChangeDTO> esSinkBuilder = new ElasticsearchSink.Builder<>(ListUtil.of(httpHost), new CustomElasticSinkFunction());
        //esSinkBuilder.setBulkFlushMaxActions(1);

        env
                .addSource(sqlServerSource)
                .filter(new DataStreamFilter())
                .sinkTo(
                        new Elasticsearch7SinkBuilder<DataChangeDTO>()
                                // 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
                                .setBulkFlushMaxActions(1)
                                .setHosts(httpHost)
                                .setEmitter(
                                        (element, context, indexer) ->
                                                indexer.add(createIndexRequest(element)))
                                .build())
                //.print()
                //.addSink(esSinkBuilder.build())
                .setParallelism(1)
                .name("flinkCDC to elasticsearch process");
                //.setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute(" SqlServer to Elastic ");

and I got error detail:

Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/unit/TimeValue
	at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:109) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
	at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:69) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
	at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.createBulkProcessor(ElasticsearchWriter.java:198) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
	at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.<init>(ElasticsearchWriter.java:105) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
	at org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:90) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-runtime-1.16.2.jar:1.16.2]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-runtime-1.16.2.jar:1.16.2]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-runtime-1.16.2.jar:1.16.2]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-runtime-1.16.2.jar:1.16.2]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_292]
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.unit.TimeValue
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_292]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_292]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[na:1.8.0_292]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_292]
	... 19 common frames omitted

I do not Figure It Out,many thanks

I try it any other way,some like :

  <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-base</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-streaming-java_2.12</artifactId>-->
<!--            <version>1.14.6</version>-->
<!--            <scope>provided</scope>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--        </dependency>-->


<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.client.version}</version>-->
<!--            <exclusions>-->
<!--                <exclusion>-->
<!--                    <artifactId>slf4j-api</artifactId>-->
<!--                    <groupId>org.slf4j</groupId>-->
<!--                </exclusion>-->
<!--            </exclusions>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>org.elasticsearch</groupId>-->
<!--            <artifactId>elasticsearch</artifactId>-->
<!--            <version>7.17.9</version>-->
<!--        </dependency>-->

and use env.addSink instead sink to ... but the problem still happend

I try remove cdc step to build a bounded stream sink to elasticsearch ,and I got the same error,plz see the picture:
enter image description here

答案1

得分: 0

Elasticsearch版本与Flink Elasticsearch连接器不兼容。该连接器的1.14版本使用的是7.5.1,可在 https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40 找到。

Flink无法升级到7.17.9,因为该新版本使用的是不兼容Apache 2.0许可证的SSPL许可证。

英文:

The Elasticsearch version that you're trying to use is not compatible with the Flink Elasticsearch connector. The 1.14 version of that uses 7.5.1, as can be found in https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40

Flink can't upgrade to 7.17.9, because that newer version uses the SSPL license which is incompatible with the Apache 2.0 license.

huangapple
  • 本文由 发表于 2023年6月19日 17:02:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/76505138.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定