英文:
FlinkCDC sink to elasticsearch org.elasticsearch.common.unit.TimeValue
问题
I see that you are encountering a class loading issue with Elasticsearch when trying to use the Flink Elasticsearch connector. The error message suggests that it cannot find the class org.elasticsearch.common.unit.TimeValue
.
To resolve this issue, you should make sure that you have the correct version of the Elasticsearch library on your classpath. In your Maven dependencies, you have defined the version of Elasticsearch as 7.17.9
. However, it seems that there may be a version mismatch or missing dependency.
Here are some steps you can take to troubleshoot and fix the issue:
-
Check Elasticsearch Version:
- Ensure that the Elasticsearch version you are using in your Flink application matches the version of the Elasticsearch library you have specified in your dependencies (i.e., both should be
7.17.9
).
- Ensure that the Elasticsearch version you are using in your Flink application matches the version of the Elasticsearch library you have specified in your dependencies (i.e., both should be
-
Update Dependencies:
- Double-check all your dependencies to make sure you don't have conflicting versions of Elasticsearch or its dependencies. Ensure that you have only one version of the Elasticsearch library in your classpath.
-
Dependency Exclusions:
- If you are using a specific version of Flink that includes Elasticsearch dependencies, consider excluding the Elasticsearch dependencies from your Flink dependencies to avoid conflicts. You can use Maven's exclusion mechanism for this purpose.
-
Clean and Rebuild:
- Sometimes, class loading issues can be resolved by cleaning your project and rebuilding it. Remove the
target
orout
directory and rebuild your project.
- Sometimes, class loading issues can be resolved by cleaning your project and rebuilding it. Remove the
-
Check Elasticsearch Sink Configuration:
- Verify that your Elasticsearch sink configuration is correct, and there are no typos or errors in specifying the Elasticsearch hosts and settings.
-
Check for Elasticsearch Plugins:
- If you are using any Elasticsearch plugins or custom configurations, ensure that they are compatible with the Elasticsearch version you are using.
-
Flink Compatibility:
- Make sure that the Flink version you are using is compatible with the Elasticsearch connector version you are using. Sometimes, certain connector versions may not work correctly with specific Flink versions.
-
Logs and Debugging:
- Review the logs in more detail to identify any other error messages or stack traces that might provide additional information about the issue.
By following these steps and ensuring that your dependencies are correctly configured and aligned with the Elasticsearch version you intend to use, you should be able to resolve the class loading issue and successfully use the Flink Elasticsearch connector.
英文:
**I got something wrong when i try to put the datastream sink to elasticsearch 7.17.9
**
here is my maven dependeies:
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.17.9</elasticsearch.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.16.2</flink.version>
<flink.client.version>1.14.6</flink.client.version>
</properties>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>uk.co.jemos.podam</groupId>
<artifactId>podam</artifactId>
<version>7.2.11.RELEASE</version>
</dependency>
<!--sink function 方法开始 -->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-java</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-elasticsearch7_2.12</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.9</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.33</version>
</dependency>
my key code here is :
log.info("开始执行 flink sink to elasticsearch 程序!");
SourceFunction<DataChangeDTO> sqlServerSource =
SqlServerSource.<DataChangeDTO>builder()
.hostname(sourceConfiguration.getHostName())
.port(sourceConfiguration.getPort())
.database(sourceConfiguration.getDatabase())
.tableList(String.join(",", sourceConfiguration.getTableList()))
.username(sourceConfiguration.getUserName())
.password(sourceConfiguration.getPassword())
.deserializer(new CustomJsonDebeziumDeserializationSchema())
//.startupOptions(StartupOptions.initial())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8080);
HttpHost httpHost = new HttpHost(targetConfiguration.getHostName(), targetConfiguration.getPort(), "http");
//ElasticsearchSink.Builder<DataChangeDTO> esSinkBuilder = new ElasticsearchSink.Builder<>(ListUtil.of(httpHost), new CustomElasticSinkFunction());
//esSinkBuilder.setBulkFlushMaxActions(1);
env
.addSource(sqlServerSource)
.filter(new DataStreamFilter())
.sinkTo(
new Elasticsearch7SinkBuilder<DataChangeDTO>()
// 下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)
.setHosts(httpHost)
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)))
.build())
//.print()
//.addSink(esSinkBuilder.build())
.setParallelism(1)
.name("flinkCDC to elasticsearch process");
//.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute(" SqlServer to Elastic ");
and I got error detail:
Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/unit/TimeValue
at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:109) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder$1.apply(Elasticsearch7SinkBuilder.java:69) ~[flink-connector-elasticsearch7-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.createBulkProcessor(ElasticsearchWriter.java:198) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.<init>(ElasticsearchWriter.java:105) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:90) ~[flink-connector-elasticsearch-base-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-runtime-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-runtime-1.16.2.jar:1.16.2]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_292]
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.unit.TimeValue
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_292]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[na:1.8.0_292]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_292]
... 19 common frames omitted
I do not Figure It Out,many thanks
I try it any other way,some like :
<dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-base</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.12</artifactId>-->
<!-- <version>1.14.6</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.client.version}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <artifactId>slf4j-api</artifactId>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- <version>7.17.9</version>-->
<!-- </dependency>-->
and use env.addSink instead sink to ... but the problem still happend
I try remove cdc step to build a bounded stream sink to elasticsearch ,and I got the same error,plz see the picture:
enter image description here
答案1
得分: 0
Elasticsearch版本与Flink Elasticsearch连接器不兼容。该连接器的1.14版本使用的是7.5.1,可在 https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40 找到。
Flink无法升级到7.17.9,因为该新版本使用的是不兼容Apache 2.0许可证的SSPL许可证。
英文:
The Elasticsearch version that you're trying to use is not compatible with the Flink Elasticsearch connector. The 1.14 version of that uses 7.5.1, as can be found in https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-elasticsearch7/pom.xml#L40
Flink can't upgrade to 7.17.9, because that newer version uses the SSPL license which is incompatible with the Apache 2.0 license.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论