HDFS多线程读取

huangapple go评论141阅读模式
英文:

HDFS read using multithreading

问题

I understand that you're sharing code related to reading files from an HDFS directory using multi-threading with a Producer-Consumer model. Here's the translated code:

// Producer class
public void readURLS() {
    final int capacity = Integer.MAX_VALUE;

    BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
    try {
        FileSystem hdfs = FileSystem.get(hadoopConf);
        FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));

        int i = 0;

        for (FileStatus file : status) {
            LOG.info("Thread {} started: ", i++);
            LOG.info("Reading file {} ", file.getPath().getName());
            new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
        }
    } catch (IOException e) {
        LOG.error("IOException occurred while listing files from HDFS directory");
    }
}

// FetchData class
@Override
public void run() {
    LOG.info("Inside reader to start reading the files ");

    try (BufferedReader bufferedReader =
            new BufferedReader(new InputStreamReader
                    (FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {

        String line;
        while ((line = bufferedReader.readLine()) != null) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            LOG.info("Line is :{}", line);
            queue.put(line);
        }

    } catch (IOException e) {
        LOG.error("file : {} ", file.toString());
        throw new IOException(e);
    } catch (InterruptedException e) {
        LOG.error("An error has occurred: ", e);
        Thread.currentThread().interrupt();
    }
}

When executing the code, you're encountering an InterruptedIOException. It's happening because your code is checking for thread interruption using Thread.interrupted() inside the run() method of the FetchData class. If the thread is interrupted, it throws an InterruptedException, and this can lead to the InterruptedIOException you're seeing. You might need to carefully manage thread interruption points and consider how thread interruption is affecting your program flow.

英文:

I am reading files from HDFS directory using multi-threading using a Producer-Consumer model, leveraging BlockingQueue.

Here is my code;

producer class:

public void readURLS() {
final int capacity = Integer.MAX_VALUE;
BlockingQueue&lt;String&gt; queue = new LinkedBlockingQueue&lt;&gt;(capacity);
try {
FileSystem hdfs = FileSystem.get(hadoopConf);
FileStatus[] status = hdfs.listStatus(new Path(&quot;MYHDFS_PATH&quot;));
int i = 0;
for (FileStatus file : status) {
LOG.info(&quot;Thread {} started: &quot;, i++);
LOG.info(&quot;Reading file {} &quot;, file.getPath().getName());
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
}
} catch (IOException e) {
LOG.error(&quot;IOException occured while listing files from HDFS directory&quot;);
}
}

FetchData:

 @Override
public void run() {
LOG.info(&quot;Inside reader to start reading the files &quot;);
try (BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
LOG.info(&quot;Line is :{}&quot;, line);
queue.put(line);
}
} catch (IOException e) {
LOG.error(&quot;file : {} &quot;, file.toString());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.error(&quot;An error has occurred: &quot;, e);
Thread.currentThread().interrupt();
}

While executing the code it throws me InterruptedIOException:

java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected 

Any idea why. My idea is to loop over each file and read each file using a separate thread.

答案1

得分: 2

我在使用多个(许多!)线程从HDFS中获取数据时也遇到了相同的行为,并且不知道答案是“为什么?” 但保持同时访问HDFS的线程数似乎有所帮助。

在您的情况下,我建议使用具有有限线程数的ExecutorService,并微调该数字,直到不再出现异常。

因此,创建ExecutorService(起始点为10个线程):

final ExecutorService executorService = Executors.newFixedThreadPool(10);

然后,不再使用以下方式:

new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();

而是改为:

executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));

另一个改进是,由于org.apache.hadoop.fs.FileSystem实现了Closeable,您应该关闭它。在您的代码中,每个线程都创建了一个FileSystem的新实例,但没有关闭它。因此,我建议将其提取到try块内的变量中:

try (FileSystem fileSystem = FileSystem.get(hadoopConf);
     BufferedReader bufferedReader =
             new BufferedReader(new InputStreamReader
                     (fileSystem.open(file), StandardCharsets.UTF_8))) {

更新:

尽管上面的代码似乎是Closeable对象的正确处理方式,但默认情况下,FileSystem.get将返回从缓存中获取的实例:

/** FileSystem cache */
static final Cache CACHE = new Cache();

因此,当调用close()时,事情会变得非常糟糕。

您可以通过将fs.hdfs.impl.disable.cache配置参数设置为true来禁用FileSystem缓存,或者确保只有在所有工作线程都完成时才关闭FileSystem实例。另外,似乎您可以只使用一个FileSystem实例来为所有工作线程服务,尽管我无法在文档中找到任何确认这将在没有额外同步的情况下正常工作的信息。

英文:

I'm also getting same behavior when using HDFS from multiple (many!) threads, and do not know the answer to the question "why?", but keeping the number of threads accessing HDFS concurrently seems to help.

In your case I would recommend to use an ExecutorService with limited number of threads, and fine-tune that number to the limit when you do not get exceptions.

So, create the ExecutorService (with 10 threads as a starting point):

final ExecutorService executorService = Executors.newFixedThreadPool(10);

and instead of your

new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();

do

executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));

Another improvement is since org.apache.hadoop.fs.FileSystem implements Closeable, you should close it. In your code every thread creates a new instance of FileSystem, but does not close it. So I would extract it into a variable inside your try:

try (FileSystem fileSystem = FileSystem.get(hadoopConf);
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(fileSystem.open(file), StandardCharsets.UTF_8))) {

UPDATE:

Although the above code seems to be the right approach for the Closeable objects, by default FileSystem.get will return cached instances from the

/** FileSystem cache */
static final Cache CACHE = new Cache();

and thus things will break horribly when close() will be called on them.

You could either disable the FileSystem cache by setting fs.hdfs.impl.disable.cache config param to true, or make sure the FileSystem instance(s) only closed when all workers have finished. It also seems that you could just use a single instance of FileSystem for all your workers, although I can't find any confirmation in javadocs that this will work properly without extra synchronisation.

huangapple
  • 本文由 发表于 2020年8月4日 04:29:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/63236537.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定