英文:
Apache Flume Java client fails to start with a single Kafka sink
问题
使用 org.apache.flume.agent.embedded.EmbeddedAgent
。
配置如下:
Map<String, String> configurationProperties = ...;
service.configure(configurationProperties);
其中 configurationProperties
被设置为:
{
"kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000",
"processor.type": "load_balance",
"sinks": "kafkaSink1",
"channel.keep-alive": "0",
"channel.checkpointDir": "********************************",
"kafkaSink.kafka.producer.reconnect.backoff.ms": "2000",
"channel.dataDirs": "********************************",
"kafkaSink.kafka.producer.retry.backoff.ms": "1000",
"processor.selector.maxTimeOut": "60000",
"kafkaSink.kafka.producer.max.request.size": "5485760",
"kafkaSink1.flumeBatchSize": "1000",
"kafkaSink.kafka.producer.buffer.memory": "67108864",
"kafkaSink.kafka.producer.client.id": "********************************",
"kafkaSink1.useFlumeEventFormat": "true",
"kafkaSink1.kafka.topic": "********************************",
"kafkaSink.kafka.producer.batch.size": "8196",
"channel.kafka.dataDirs": "********************************",
"kafkaSink1.type": "KAFKA",
"channel.backupCheckpointDir": "********************************",
"kafkaSink1.allowTopicOverride": "true",
"channel.useDualCheckpoints": "true",
"kafkaSink.kafka.producer.compression.type": "lz4",
"processor.maxBackoff": "60000",
"use_dual_channel": "true",
"channel.capacity": "1000000",
"channel.byteCapacityBufferPercentage": "50",
"channel.transactionCapacity": "1000",
"channel.byteCapacity": "10485760",
"channel.type": "file",
"processor.backoff": "true",
"channel.kafka.checkpointDir": "********************************",
"channel.kafka.backupCheckpointDir": "********************************",
"kafkaSink1.kafka.bootstrap.servers": "********************************",
"kafkaSink.kafka.producer.acks": "-1"
}
运行时抛出以下异常:
java.lang.NullPointerException
at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
at ******************.startService(******************)
然后 Flume 将无法启动。
位于 org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
的代码显示它正在寻找属性 sinks
,而该属性不为空,因此不应该抛出此类错误...
有人知道原因吗?关于这方面没有任何文档...
英文:
Using org.apache.flume.agent.embedded.EmbeddedAgent
.
Configuration as such:
Map<String, String> configurationProperties = ...;
service.configure(configurationProperties);
Where configurationProperties
is set with:
{
"kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000",
"processor.type": "load_balance",
"sinks": "kafkaSink1",
"channel.keep-alive": "0",
"channel.checkpointDir": "********************************",
"kafkaSink.kafka.producer.reconnect.backoff.ms": "2000",
"channel.dataDirs": "********************************",
"kafkaSink.kafka.producer.retry.backoff.ms": "1000",
"processor.selector.maxTimeOut": "60000",
"kafkaSink.kafka.producer.max.request.size": "5485760",
"kafkaSink1.flumeBatchSize": "1000",
"kafkaSink.kafka.producer.buffer.memory": "67108864",
"kafkaSink.kafka.producer.client.id": "********************************",
"kafkaSink1.useFlumeEventFormat": "true",
"kafkaSink1.kafka.topic": "********************************",
"kafkaSink.kafka.producer.batch.size": "8196",
"channel.kafka.dataDirs": "********************************",
"kafkaSink1.type": "KAFKA",
"channel.backupCheckpointDir": "********************************",
"kafkaSink1.allowTopicOverride": "true",
"channel.useDualCheckpoints": "true",
"kafkaSink.kafka.producer.compression.type": "lz4",
"processor.maxBackoff": "60000",
"use_dual_channel": "true",
"channel.capacity": "1000000",
"channel.byteCapacityBufferPercentage": "50",
"channel.transactionCapacity": "1000",
"channel.byteCapacity": "10485760",
"channel.type": "file",
"processor.backoff": "true",
"channel.kafka.checkpointDir": "********************************",
"channel.kafka.backupCheckpointDir": "********************************",
"kafkaSink1.kafka.bootstrap.servers": "********************************",
"kafkaSink.kafka.producer.acks": "-1"
}
At runtime it throw the following:
java.lang.NullPointerException
at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
at ******************.startService(******************)
And flume will not start whatsoever.
The code for org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52
shows it is looking for property sinks
which is not empty so it shouldn't throw any such error...
Anyone knows why? No documentation about any of that....
答案1
得分: 0
如果有人偶然遇到这个问题...
在进一步调查后,我认为Apache Flume项目已经停滞不前,我们将停止使用它。
有关详细信息,请参阅以下内容:
-
许多未解决的问题... 没有回应... https://cwiki.apache.org/confluence/display/FLUME/Developer+Section
-
最近版本发布已经将近2年了(最初每6个月发布一次)。https://flume.apache.org/index.html
-
Wiki甚至没有更新到最新版本 - 最后一次更新已经超过5年了 - https://cwiki.apache.org/confluence/display/FLUME/Home
遗憾的是,我不得不以这种方式关闭这个问题。
希望能对任何人有所帮助。
英文:
In case someone stumbles upon this issue...
After some further looking into I believe the Apache Flume project is dead in the water and we're going to stop using it.
For details please see the following:
-
Many open issues… no responses… https://cwiki.apache.org/confluence/display/FLUME/Developer+Section
-
Last version released almost 2 years ago (in the beginning it used to release every 6 months). https://flume.apache.org/index.html
-
Wiki not even updated with last versions - last update more then 5 years ago - https://cwiki.apache.org/confluence/display/FLUME/Home
Sadly I'm going to have to close this question in this way.
Hope that helps anyone.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论