Apache Flume代理不会将数据保存在HDFS中。

huangapple go评论49阅读模式
英文:

Apache Flume agent does not save the data in HDFS

问题

我正在尝试使用Apache Flume创建代理,但我对此还不太了解。代理需要从Netcat接收数据并将其保存在HDFS文件系统中。代理将接收的数据示例如下:

1, E1, Eneko, Donostia

1, E2, Ane, Bilbo

2, E3, Julen, Baiona

2, E4, Jack, London

在Netcat中,我可以逐行写入数据,这不是问题。但是,如果行以数字1开头,该行必须保存在名为manager的目录(位于HDFS中),否则保存在名为developer的其他目录中。

我已经创建了以下配置文件,代理可以正常启动。我还可以从Netcat发送数据,代理似乎也能正确监听,因为会返回"OK"。但是,由Netcat发送的任何行都没有到达HDFS,我创建的目录(manager和developer)始终为空。

我在HDFS根目录中使用以下命令创建了这些目录:
hadoop fs -mkdir ../../<directory_name>

在日志文件(/var/log/flume-ng/flume.log)中没有出现错误。

请帮助我。我已经检查了很多东西,但不知道还能做什么。

以下是Apache Flume的配置文件:

a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100

a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex= ^(\\d)
a1.sources.r1.interceptors.i1.serializers=s1
a1.sources.r1.interceptors.i1.serializers.s1.name=Rola

a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=Rola
a1.sources.r1.selector.mapping.1=c1
a1.sources.r1.selector.mapping.2=c2

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/manager
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileStream=DataStream

a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://localhost:8020/developer
a1.sinks.k2.hdfs.writeFormat=Text
a1.sinks.k2.hdfs.fileStream=DataStream

a1.sources.r1.channels=c2 c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

希望这些信息对您有所帮助。如果您有其他问题,请随时提出。

英文:

I am trying to create an agent with Apache Flume, but I am new to this and I have not much idea. The agent has to receive the data from Netcat and save it in an HDFS file system. The data that the agent will receive will be, for example, these:

1, E1, Eneko, Donostia

1, E2, Ane, Bilbo

2, E3, Julen, Baiona

2, E4, Jack, London

In the netcat, I can write the rows one by one, that is not a problem. But if the row begins by number 1, that row must be saved in the directory called manager (located in HDFS) and if not, in other directory called developer (located in HDFS).

I have done the following configuration file and the agent starts correctly. I can also send data from netcat and it seems that the agent listens correctly, since OK returns. But no row sent by netcat reaches HDFS, the directories (manager and developer) that I have created are always empty.

I have created the directories in the HDFS root, with the following command:
hadoop fs -mkdir ../../ <directory_name>

In the log file (/var/log/flume-ng/flume.log) no error appears.

Please help me. I've been checking many things and I don't know what else I can do.

Here you have the Apache Flume configuration file:

a1.sources=r1
a1.channels=c1 c2
a1.sinks = k1 k2

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100

a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex= ^(\\d)
a1.sources.r1.interceptors.i1.serializers=s1
a1.sources.r1.interceptors.i1.serializers.s1.name=Rola

a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=Rola
a1.sources.r1.selector.mapping.1=c1
a1.sources.r1.selector.mapping.2=c2

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/manager
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileStream=DataStream

a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://localhost:8020/developer
a1.sinks.k2.hdfs.writeFormat=Text
a1.sinks.k2.hdfs.fileStream=DataStream

a1.sources.r1.channels = c2 c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

答案1

得分: 0

问题在于拦截器未定义。一旦定义,一切都会正常工作。

a1.sources.r1.interceptors = i1

拦截器必须在使用拦截器的块之前定义。

英文:

The problem was that the interceptor was not defined. Once defined everything works correctly.

a1.sources.r1.interceptors = i1

The interceptor must be defined before the block that uses the interceptor.

huangapple
  • 本文由 发表于 2023年3月7日 05:17:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/75655940.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定