Apache Flume的Java客户端在使用单个Kafka接收器时无法启动。

huangapple go评论110阅读模式
英文:

Apache Flume Java client fails to start with a single Kafka sink

问题

使用 org.apache.flume.agent.embedded.EmbeddedAgent

配置如下:

Map<String, String> configurationProperties = ...;
service.configure(configurationProperties);

其中 configurationProperties 被设置为:

{
  "kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000",
  "processor.type": "load_balance",
  "sinks": "kafkaSink1",
  "channel.keep-alive": "0",
  "channel.checkpointDir": "********************************",
  "kafkaSink.kafka.producer.reconnect.backoff.ms": "2000",
  "channel.dataDirs": "********************************",
  "kafkaSink.kafka.producer.retry.backoff.ms": "1000",
  "processor.selector.maxTimeOut": "60000",
  "kafkaSink.kafka.producer.max.request.size": "5485760",
  "kafkaSink1.flumeBatchSize": "1000",
  "kafkaSink.kafka.producer.buffer.memory": "67108864",
  "kafkaSink.kafka.producer.client.id": "********************************",
  "kafkaSink1.useFlumeEventFormat": "true",
  "kafkaSink1.kafka.topic": "********************************",
  "kafkaSink.kafka.producer.batch.size": "8196",
  "channel.kafka.dataDirs": "********************************",
  "kafkaSink1.type": "KAFKA",
  "channel.backupCheckpointDir": "********************************",
  "kafkaSink1.allowTopicOverride": "true",
  "channel.useDualCheckpoints": "true",
  "kafkaSink.kafka.producer.compression.type": "lz4",
  "processor.maxBackoff": "60000",
  "use_dual_channel": "true",
  "channel.capacity": "1000000",
  "channel.byteCapacityBufferPercentage": "50",
  "channel.transactionCapacity": "1000",
  "channel.byteCapacity": "10485760",
  "channel.type": "file",
  "processor.backoff": "true",
  "channel.kafka.checkpointDir": "********************************",
  "channel.kafka.backupCheckpointDir": "********************************",
  "kafkaSink1.kafka.bootstrap.servers": "********************************",
  "kafkaSink.kafka.producer.acks": "-1"
}

运行时抛出以下异常:

java.lang.NullPointerException
        at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
        at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
        at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
        at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
        at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
        at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
        at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
        at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
        at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
        at ******************.startService(******************)

然后 Flume 将无法启动。

位于 org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52) 的代码显示它正在寻找属性 sinks,而该属性不为空,因此不应该抛出此类错误...

有人知道原因吗?关于这方面没有任何文档...

英文:

Using org.apache.flume.agent.embedded.EmbeddedAgent.
Configuration as such:

Map&lt;String, String&gt; configurationProperties = ...;
service.configure(configurationProperties);

Where configurationProperties is set with:

    {
&quot;kafkaSink.kafka.producer.reconnect.backoff.max.ms&quot;: &quot;30000&quot;,
&quot;processor.type&quot;: &quot;load_balance&quot;,
&quot;sinks&quot;: &quot;kafkaSink1&quot;,
&quot;channel.keep-alive&quot;: &quot;0&quot;,
&quot;channel.checkpointDir&quot;: &quot;********************************&quot;,
&quot;kafkaSink.kafka.producer.reconnect.backoff.ms&quot;: &quot;2000&quot;,
&quot;channel.dataDirs&quot;: &quot;********************************&quot;,
&quot;kafkaSink.kafka.producer.retry.backoff.ms&quot;: &quot;1000&quot;,
&quot;processor.selector.maxTimeOut&quot;: &quot;60000&quot;,
&quot;kafkaSink.kafka.producer.max.request.size&quot;: &quot;5485760&quot;,
&quot;kafkaSink1.flumeBatchSize&quot;: &quot;1000&quot;,
&quot;kafkaSink.kafka.producer.buffer.memory&quot;: &quot;67108864&quot;,
&quot;kafkaSink.kafka.producer.client.id&quot;: &quot;********************************&quot;,
&quot;kafkaSink1.useFlumeEventFormat&quot;: &quot;true&quot;,
&quot;kafkaSink1.kafka.topic&quot;: &quot;********************************&quot;,
&quot;kafkaSink.kafka.producer.batch.size&quot;: &quot;8196&quot;,
&quot;channel.kafka.dataDirs&quot;: &quot;********************************&quot;,
&quot;kafkaSink1.type&quot;: &quot;KAFKA&quot;,
&quot;channel.backupCheckpointDir&quot;: &quot;********************************&quot;,
&quot;kafkaSink1.allowTopicOverride&quot;: &quot;true&quot;,
&quot;channel.useDualCheckpoints&quot;: &quot;true&quot;,
&quot;kafkaSink.kafka.producer.compression.type&quot;: &quot;lz4&quot;,
&quot;processor.maxBackoff&quot;: &quot;60000&quot;,
&quot;use_dual_channel&quot;: &quot;true&quot;,
&quot;channel.capacity&quot;: &quot;1000000&quot;,
&quot;channel.byteCapacityBufferPercentage&quot;: &quot;50&quot;,
&quot;channel.transactionCapacity&quot;: &quot;1000&quot;,
&quot;channel.byteCapacity&quot;: &quot;10485760&quot;,
&quot;channel.type&quot;: &quot;file&quot;,
&quot;processor.backoff&quot;: &quot;true&quot;,
&quot;channel.kafka.checkpointDir&quot;: &quot;********************************&quot;,
&quot;channel.kafka.backupCheckpointDir&quot;: &quot;********************************&quot;,
&quot;kafkaSink1.kafka.bootstrap.servers&quot;: &quot;********************************&quot;,
&quot;kafkaSink.kafka.producer.acks&quot;: &quot;-1&quot;
}

At runtime it throw the following:

java.lang.NullPointerException
at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
at org.apache.flume.conf.FlumeConfiguration.&lt;init&gt;(FlumeConfiguration.java:133)
at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
at ******************.startService(******************)

And flume will not start whatsoever.

The code for org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52 shows it is looking for property sinks which is not empty so it shouldn't throw any such error...

Anyone knows why? No documentation about any of that....

答案1

得分: 0

如果有人偶然遇到这个问题...
在进一步调查后,我认为Apache Flume项目已经停滞不前,我们将停止使用它。
有关详细信息,请参阅以下内容:

遗憾的是,我不得不以这种方式关闭这个问题。
希望能对任何人有所帮助。

英文:

In case someone stumbles upon this issue...
After some further looking into I believe the Apache Flume project is dead in the water and we're going to stop using it.
For details please see the following:

Sadly I'm going to have to close this question in this way.
Hope that helps anyone.

huangapple
  • 本文由 发表于 2020年9月7日 19:50:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/63777062.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定