英文:
Apache Flume agent does not save the data in HDFS
问题
我正在尝试使用Apache Flume创建代理,但我对此还不太了解。代理需要从Netcat接收数据并将其保存在HDFS文件系统中。代理将接收的数据示例如下:
1, E1, Eneko, Donostia
1, E2, Ane, Bilbo
2, E3, Julen, Baiona
2, E4, Jack, London
在Netcat中,我可以逐行写入数据,这不是问题。但是,如果行以数字1开头,该行必须保存在名为manager的目录(位于HDFS中),否则保存在名为developer的其他目录中。
我已经创建了以下配置文件,代理可以正常启动。我还可以从Netcat发送数据,代理似乎也能正确监听,因为会返回"OK"。但是,由Netcat发送的任何行都没有到达HDFS,我创建的目录(manager和developer)始终为空。
我在HDFS根目录中使用以下命令创建了这些目录:
hadoop fs -mkdir ../../<directory_name>
在日志文件(/var/log/flume-ng/flume.log)中没有出现错误。
请帮助我。我已经检查了很多东西,但不知道还能做什么。
以下是Apache Flume的配置文件:
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex= ^(\\d)
a1.sources.r1.interceptors.i1.serializers=s1
a1.sources.r1.interceptors.i1.serializers.s1.name=Rola
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=Rola
a1.sources.r1.selector.mapping.1=c1
a1.sources.r1.selector.mapping.2=c2
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/manager
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileStream=DataStream
a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://localhost:8020/developer
a1.sinks.k2.hdfs.writeFormat=Text
a1.sinks.k2.hdfs.fileStream=DataStream
a1.sources.r1.channels=c2 c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
希望这些信息对您有所帮助。如果您有其他问题,请随时提出。
英文:
I am trying to create an agent with Apache Flume, but I am new to this and I have not much idea. The agent has to receive the data from Netcat and save it in an HDFS file system. The data that the agent will receive will be, for example, these:
1, E1, Eneko, Donostia
1, E2, Ane, Bilbo
2, E3, Julen, Baiona
2, E4, Jack, London
In the netcat, I can write the rows one by one, that is not a problem. But if the row begins by number 1, that row must be saved in the directory called manager (located in HDFS) and if not, in other directory called developer (located in HDFS).
I have done the following configuration file and the agent starts correctly. I can also send data from netcat and it seems that the agent listens correctly, since OK returns. But no row sent by netcat reaches HDFS, the directories (manager and developer) that I have created are always empty.
I have created the directories in the HDFS root, with the following command:
hadoop fs -mkdir ../../ <directory_name>
In the log file (/var/log/flume-ng/flume.log) no error appears.
Please help me. I've been checking many things and I don't know what else I can do.
Here you have the Apache Flume configuration file:
a1.sources=r1
a1.channels=c1 c2
a1.sinks = k1 k2
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex= ^(\\d)
a1.sources.r1.interceptors.i1.serializers=s1
a1.sources.r1.interceptors.i1.serializers.s1.name=Rola
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=Rola
a1.sources.r1.selector.mapping.1=c1
a1.sources.r1.selector.mapping.2=c2
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/manager
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileStream=DataStream
a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://localhost:8020/developer
a1.sinks.k2.hdfs.writeFormat=Text
a1.sinks.k2.hdfs.fileStream=DataStream
a1.sources.r1.channels = c2 c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
答案1
得分: 0
问题在于拦截器未定义。一旦定义,一切都会正常工作。
a1.sources.r1.interceptors = i1
拦截器必须在使用拦截器的块之前定义。
英文:
The problem was that the interceptor was not defined. Once defined everything works correctly.
a1.sources.r1.interceptors = i1
The interceptor must be defined before the block that uses the interceptor.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论