英文:
HDFS read using multithreading
问题
I understand that you're sharing code related to reading files from an HDFS directory using multi-threading with a Producer-Consumer model. Here's the translated code:
// Producer class
public void readURLS() {
final int capacity = Integer.MAX_VALUE;
BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
try {
FileSystem hdfs = FileSystem.get(hadoopConf);
FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));
int i = 0;
for (FileStatus file : status) {
LOG.info("Thread {} started: ", i++);
LOG.info("Reading file {} ", file.getPath().getName());
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
}
} catch (IOException e) {
LOG.error("IOException occurred while listing files from HDFS directory");
}
}
// FetchData class
@Override
public void run() {
LOG.info("Inside reader to start reading the files ");
try (BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
LOG.info("Line is :{}", line);
queue.put(line);
}
} catch (IOException e) {
LOG.error("file : {} ", file.toString());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.error("An error has occurred: ", e);
Thread.currentThread().interrupt();
}
}
When executing the code, you're encountering an InterruptedIOException
. It's happening because your code is checking for thread interruption using Thread.interrupted()
inside the run()
method of the FetchData
class. If the thread is interrupted, it throws an InterruptedException
, and this can lead to the InterruptedIOException
you're seeing. You might need to carefully manage thread interruption points and consider how thread interruption is affecting your program flow.
英文:
I am reading files from HDFS directory using multi-threading using a Producer-Consumer model, leveraging BlockingQueue.
Here is my code;
producer class:
public void readURLS() {
final int capacity = Integer.MAX_VALUE;
BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
try {
FileSystem hdfs = FileSystem.get(hadoopConf);
FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));
int i = 0;
for (FileStatus file : status) {
LOG.info("Thread {} started: ", i++);
LOG.info("Reading file {} ", file.getPath().getName());
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
}
} catch (IOException e) {
LOG.error("IOException occured while listing files from HDFS directory");
}
}
FetchData:
@Override
public void run() {
LOG.info("Inside reader to start reading the files ");
try (BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
LOG.info("Line is :{}", line);
queue.put(line);
}
} catch (IOException e) {
LOG.error("file : {} ", file.toString());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.error("An error has occurred: ", e);
Thread.currentThread().interrupt();
}
While executing the code it throws me InterruptedIOException:
java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected
Any idea why. My idea is to loop over each file and read each file using a separate thread.
答案1
得分: 2
我在使用多个(许多!)线程从HDFS中获取数据时也遇到了相同的行为,并且不知道答案是“为什么?” 但保持同时访问HDFS的线程数似乎有所帮助。
在您的情况下,我建议使用具有有限线程数的ExecutorService,并微调该数字,直到不再出现异常。
因此,创建ExecutorService(起始点为10个线程):
final ExecutorService executorService = Executors.newFixedThreadPool(10);
然后,不再使用以下方式:
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
而是改为:
executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));
另一个改进是,由于org.apache.hadoop.fs.FileSystem
实现了Closeable
,您应该关闭它。在您的代码中,每个线程都创建了一个FileSystem
的新实例,但没有关闭它。因此,我建议将其提取到try
块内的变量中:
try (FileSystem fileSystem = FileSystem.get(hadoopConf);
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(fileSystem.open(file), StandardCharsets.UTF_8))) {
更新:
尽管上面的代码似乎是Closeable
对象的正确处理方式,但默认情况下,FileSystem.get
将返回从缓存中获取的实例:
/** FileSystem cache */
static final Cache CACHE = new Cache();
因此,当调用close()
时,事情会变得非常糟糕。
您可以通过将fs.hdfs.impl.disable.cache
配置参数设置为true
来禁用FileSystem缓存,或者确保只有在所有工作线程都完成时才关闭FileSystem
实例。另外,似乎您可以只使用一个FileSystem
实例来为所有工作线程服务,尽管我无法在文档中找到任何确认这将在没有额外同步的情况下正常工作的信息。
英文:
I'm also getting same behavior when using HDFS from multiple (many!) threads, and do not know the answer to the question "why?", but keeping the number of threads accessing HDFS concurrently seems to help.
In your case I would recommend to use an ExecutorService with limited number of threads, and fine-tune that number to the limit when you do not get exceptions.
So, create the ExecutorService (with 10 threads as a starting point):
final ExecutorService executorService = Executors.newFixedThreadPool(10);
and instead of your
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
do
executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));
Another improvement is since org.apache.hadoop.fs.FileSystem
implements Closeable
, you should close it. In your code every thread creates a new instance of FileSystem
, but does not close it. So I would extract it into a variable inside your try
:
try (FileSystem fileSystem = FileSystem.get(hadoopConf);
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(fileSystem.open(file), StandardCharsets.UTF_8))) {
UPDATE:
Although the above code seems to be the right approach for the Closeable
objects, by default FileSystem.get
will return cached instances from the
/** FileSystem cache */
static final Cache CACHE = new Cache();
and thus things will break horribly when close()
will be called on them.
You could either disable the FileSystem cache by setting fs.hdfs.impl.disable.cache
config param to true
, or make sure the FileSystem
instance(s) only closed when all workers have finished. It also seems that you could just use a single instance of FileSystem for all your workers, although I can't find any confirmation in javadocs that this will work properly without extra synchronisation.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论